Spark Tips

1. Loop를 쓰는 순간 OutOfMemoryError

DataFrame를 다루지 않는 순간 Spark cluster 중 Driver만 사용하게 된다.

DataFrame을 안씀 (= 기껏 sqlContext.read로 읽어놓고 collect해서 사용함)
-> 스파크가 아니라 스칼라에서 돌아감 (= Executor가 아니라 Driver에서 돌아감)
-> Single node computing
-> Out of Memory

2. UDF를 사용할 때

a) 파라미터에 Primitive Type을 넣지 않는 것을 추천

Scala기준으로 설명한다.

Scala built-in Value 타입인 Int, Long등을 사용하지 말자. 이 타입들은 null이 될 수 없다. Database에 불러온 값이 null인 경우 UDF가 정상적으로 실행되지 않고 UDF의 결과 또한 null로 고정된다.

import org.apache.spark.sql.functions
type myFunctionType = (Int, Long, String) => Int
val myFunction: myFunctionType = (i, l, str) => i + l.toInt + Integer.parseInt(str)
val myUDF = functions.udf(myFunction)
view raw example1.scala hosted with ❤ by GitHub

line 4의 myFunction의 첫 번째, 두 번째 파라미터는 primitive type이다. (조금 더 정확히는 Scala 2.10에서 추가된 Value 클래스다. 링크) 따라서, 해당 첫 번째, 두 번째 컬럼 중 하나라도 null인 경우 이 UDF는 null을 반환하게 된다. 이와 같은 기능을 원한 것이 아니라면 java.lang 패키지 밑에 있는 클래스들을 사용하자.

b) null의 멤버를 선택하면 null이다.

위와 연결되는 팁이다. 이번에는 UDF의 return type을 다음과 같이 composite type으로 가정하자.

type myFunction2Type = (Long, java.lang.Double, String) => Tuple2[Double, String]
val myFunction2: myFunction2Type = (l, d, str) => (d + l, str)
val myUDF2 = functions.udf(myFunction2)
view raw example2.scala hosted with ❤ by GitHub

위 코드에는 anomaly가 있다. myFunction2의 첫 번째 파라미터의 타입이 원시적(primitive)이다. 따라서 해당 값이 null로 들어오면 myUDF2의 return 값도 당연히 null이 들어오겠지. 계속 코드를 보자.

val colList = List[Column](
originalDF("long_column"),
originalDF("double_column"),
originalDF("string_column")
)
val processedDF = originalDF
.withColumn("newCol", myUDF2(colList:_*))
.select("newCol._1", "newCol._2")
view raw example3.scala hosted with ❤ by GitHub

JavaScript에 익숙한 나는, myUDF2의 result가 null이라면, null._1은 Null Pointer Exception이 발생해야 한다고 생각했다. 하지만, null의 멤버를 선택하면 에러나 예외가 발생하지 않고 null이 반환된다. 따라서 processedDF는 Long이 null로 들어온 경우 null, null의 값을 가진 행으로 처리된다.

c) UDF의 return type으로는 case class를 사용하는 것이 좋다.

Serializable한 것이 좋다. Scala의 대부분의 Collection들은 serializable하다. 다만, Seq는 한 가지 타입으로 통일되어야 하는 단점이 있다. 혹시나 해서 Seq[Any], Seq[AnyVal] 등으로 시도해봤지만 Spark에서 뱉어내므로, Seq는 멀리하는 것이 좋겠다. 그러면 우리에게 남은 선택지는 tuple 아니면 case class인데, 개인적으로 코드의 가독성을 위해 case class를 추천한다.

다만, case class가 serializable 하다고 해서 class 내부에 선언하게 되면, 모든 부모/조상 class들이 serializable해야 하므로 별도의 파일로 빼내야 spark error를 피할 수 있다.

Leave a Reply