The inevitable “Task not serializable” SparkException

The good old:

org.apache.spark.SparkException: Task not serializable

usually surfaces at least once in a spark developer’s career, or in my case, whenever enough time has gone by since I’ve seen it that I’ve conveniently forgotten its existence and the fact that it is (usually) easily avoided.

Here is the scenario: I need a class that filters an RDD, for example:

class AreWeSerializableYet(sc: SparkContext) {

val rdd = sc.parallelize(1 to 10)
val numberTwo = 2

def doFilter() = {
 val filtered = rdd.filter(defEvens) //not serializable
      filtered.collect()
 }
 def defEvens(a:Int) = a % 2 == 0
}

Let’s instantiate the class in a test and call that method:

class ShortTest extends FlatSpec with Matchers {

val sc = getSpark().sparkContext
 val expectedOutput = (2 to 10 by 2).toList

"AreWeSerializableYet" should "blow up (or not) for demo purposes" in {
 val subject = new AreWeSerializableYet(sc)
 val result = subject.doFilter()
 result should be (expectedOutput)
 }

def getSpark(appName:String = "testerApp") = {
 SparkSession
 .builder()
 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 .master("local[*]")
 .appName(appName)
 .getOrCreate()
 }

}

We run our tests in the IDE and are disappointed to find this in the output:

org.apache.spark.SparkException: Task not serializable

Trogging further down the stacktrace we see the cause:

Caused by: java.io.NotSerializableException: com.ehur.AreWeSerializableYet
Serialization stack:
 - object not serializable (class: com.ehur.AreWeSerializableYet, value: com.ehur.AreWeSerializableYet@4ba380c7)
 - field (class: com.ehur.AreWeSerializableYet$$anonfun$1, name: $outer, type: class com.ehur.AreWeSerializableYet)
 - object (class com.ehur.AreWeSerializableYet$$anonfun$1, <function1>)

The problem? Spark is attempting to serialize the entire instance of the AreWeSerializableYet class (you can see in the above trace that the method, blah.blah.$$anonfun$1, keeps a reference to the class it belongs to in a field called $outer.) But our class is not serializable, and will remain so even if we slap a “with Serializable” on it. For one thing, one of its instance variables is a spark context, and trying to serialize a spark context is just not done.

The solution? Try passing a function to the filter instead of a method call. The function does not retain any handle on the class instance and can be serialized into the cluster as is:

def doFilter() = {
  val filtered = rdd.filter(valEvens)  //yes serializable
  filtered.collect()
}

val valEvens = (a:Int) => a % 2 == 0

What if you need to reference an (immutable) instance variable of the class in your function? Recall that:

val numberTwo = 2

is defined in the class. This too will fail with the Task not serializable exception, because the numberTwo variable again has a handle on the instance it belongs to, which is not a serializable object:

def doFilter() = {
  val filtered = rdd.filter(isDivisibleByTwo)   //not serializable
  filtered.collect()
}
val isDivisibleByTwo:(Int) => Boolean = _ % numberTwo == 0

One work around this is to wrap that instance variable in some serializable case class that in turn contains the function:

def doFilter() = {
  val serializableWrappedVar = SerializableThing(numberTwo)
  val filtered = rdd.filter(serializableWrappedVar.isDivisibleByMe)   //yes serializable
  filtered.collect()
}
case class SerializableThing(num:Int) {
  def isDivisibleByMe(a:Int) = a % num == 0
}

Yes, lots of hoops to jump through, but we need to filter those RDDs somehow…

 

Posted in code Tagged with: , , , ,