Hi All,

 

Basically I try to define a simple UDF and use it in the query, but it gives
me "Task not serializable"

 

       public void test() {

              RiskGroupModelDefinition model =
registeredRiskGroupMap.get(this.modelId);

              RiskGroupModelDefinition edm = this.createEdm();

              JavaSparkContext ctx = this.createSparkContext();

              SQLContext sql = new SQLContext(ctx);

              sql.udf().register("year", new UDF1<Date, Integer>() {

                     @Override

                     public Integer call(Date d) throws Exception {

                           return d.getYear();

                     }

              }, new org.apache.spark.sql.types.IntegerType());

              /** Retrieve all tables for EDM */

              DataFrame property =
sql.parquetFile(edm.toS3nPath("property")).filter("ISVALID = 1");

              property.registerTempTable("p");

 

              DataFrame yb_lookup =
sql.parquetFile(model.toS3nPath("yb_lookup")).as("yb");

              yb_lookup.registerTempTable("yb");

              sql.sql("select * from p left join yb on
year(p.YEARBUILT)=yb.yb_class_vdm").count();

              ctx.stop();

       }

 

If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs
without any problem. But after I add the UDF to the query (just as above
code), the exception as below:

 

 

Exception in thread "main"
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:

Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L]

Exchange SinglePartition

  Aggregate true, [], [COUNT(1) AS PartialCount#43L]

   Project []

    HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

     Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

      Project [YEARBUILT#14]

       Filter ISVALID#18

        PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

     Exchange (HashPartitioning [yb_class_vdm#40L], 200)

      PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

       at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)

       at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)

       at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)

       at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899)

       at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna
bilityEncodeExecutor.java:137)

       at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna
bilityEncodeExecutor.java:488)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange SinglePartition

Aggregate true, [], [COUNT(1) AS PartialCount#43L]

  Project []

   HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

    Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

     Project [YEARBUILT#14]

      Filter ISVALID#18

       PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

    Exchange (HashPartitioning [yb_class_vdm#40L], 200)

     PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

       at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48)

       at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:126)

       at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:125)

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

       ... 6 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Aggregate true, [], [COUNT(1) AS PartialCount#43L]

Project []

  HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

   Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

    Project [YEARBUILT#14]

     Filter ISVALID#18

      PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

   Exchange (HashPartitioning [yb_class_vdm#40L], 200)

    PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

       at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)

       at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:101)

       at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:49)

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

       ... 10 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

Project [YEARBUILT#14]

  Filter ISVALID#18

   PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

 

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

       at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48)

       at
org.apache.spark.sql.execution.joins.HashOuterJoin.execute(HashOuterJoin.sca
la:188)

       at
org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)

       at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:126)

       at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:125)

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

       ... 14 more

Caused by: org.apache.spark.SparkException: Task not serializable

       at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:166)

       at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

       at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)

       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)

       at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:65)

       at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:49)

       at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

       ... 20 more

Caused by: java.lang.reflect.InvocationTargetException

       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

       at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

       at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

       at java.lang.reflect.Method.invoke(Unknown Source)

       at
org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.
getObjFieldValues$extension(SerializationDebugger.scala:240)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:150)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:158)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:158)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:158)

       at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)

       at
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugge
r.scala:58)

       at
org.apache.spark.serializer.SerializationDebugger$.improveException(Serializ
ationDebugger.scala:39)

       at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializ
er.scala:47)

       at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.
scala:80)

       at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:164)

       ... 26 more

Caused by: java.lang.ArrayIndexOutOfBoundsException: 0

       at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(Unknown
Source)

       at java.io.ObjectStreamClass.getObjFieldValues(Unknown Source)

       ... 44 more

 

 

Reply via email to