Hello Hao Ren,

Doesn't the code...

val add = udf {
      (a: Int) => a + notSer.value
    }
Mean UDF function that Int => Int ?

Thanks,
Muthu

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <inv...@gmail.com> wrote:

> I am playing with spark 2.0
> What I tried to test is:
>
> Create a UDF in which there is a non serializable object.
> What I expected is when this UDF is called during materializing the
> dataFrame where the UDF is used in "select", an task non serializable
> exception should be thrown.
> It depends also which "action" is called on that dataframe.
>
> Here is the code for reproducing the pb:
>
> ============
> object DataFrameSerDeTest extends App {
>
>   class A(val value: Int) // It is not serializable
>
>   def run() = {
>     val spark = SparkSession
>       .builder()
>       .appName("DataFrameSerDeTest")
>       .master("local[*]")
>       .getOrCreate()
>
>     import org.apache.spark.sql.functions.udf
>     import spark.sqlContext.implicits._
>
>     val notSer = new A(2)
>     val add = udf {
>       (a: Int) => a + notSer.value
>     }
>     val df = spark.createDataFrame(Seq(
>       (1, 2),
>       (2, 2),
>       (3, 2),
>       (4, 2)
>     )).toDF("key", "value")
>       .select($"key", add($"value").as("added"))
>
>     df.show() // *It should not work because the udf contains a
> non-serializable object, but it works*
>
>     df.filter($"key" === 2).show() // *It does not work as expected
> (org.apache.spark.SparkException: Task not serializable)*
>   }
>
>   run()
> }
> ============
>
> Also, I tried collect(), count(), first(), limit(). All of them worked
> without non-serializable exceptions.
> It seems only filter() throws the exception. (feature or bug ?)
>
> Any ideas ? Or I just messed things up ?
> Any help is highly appreciated.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>

Reply via email to