I am playing with spark 2.0
What I tried to test is:

Create a UDF in which there is a non serializable object.
What I expected is when this UDF is called during materializing the
dataFrame where the UDF is used in "select", an task non serializable
exception should be thrown.
It depends also which "action" is called on that dataframe.

Here is the code for reproducing the pb:

============
object DataFrameSerDeTest extends App {

  class A(val value: Int) // It is not serializable

  def run() = {
    val spark = SparkSession
      .builder()
      .appName("DataFrameSerDeTest")
      .master("local[*]")
      .getOrCreate()

    import org.apache.spark.sql.functions.udf
    import spark.sqlContext.implicits._

    val notSer = new A(2)
    val add = udf {
      (a: Int) => a + notSer.value
    }
    val df = spark.createDataFrame(Seq(
      (1, 2),
      (2, 2),
      (3, 2),
      (4, 2)
    )).toDF("key", "value")
      .select($"key", add($"value").as("added"))

    df.show() // *It should not work because the udf contains a
non-serializable object, but it works*

    df.filter($"key" === 2).show() // *It does not work as expected
(org.apache.spark.SparkException: Task not serializable)*
  }

  run()
}
============

Also, I tried collect(), count(), first(), limit(). All of them worked
without non-serializable exceptions.
It seems only filter() throws the exception. (feature or bug ?)

Any ideas ? Or I just messed things up ?
Any help is highly appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Reply via email to