Hi All,
Basically I try to define a simple UDF and use it in the query, but it gives me "Task not serializable" public void test() { RiskGroupModelDefinition model = registeredRiskGroupMap.get(this.modelId); RiskGroupModelDefinition edm = this.createEdm(); JavaSparkContext ctx = this.createSparkContext(); SQLContext sql = new SQLContext(ctx); sql.udf().register("year", new UDF1<Date, Integer>() { @Override public Integer call(Date d) throws Exception { return d.getYear(); } }, new org.apache.spark.sql.types.IntegerType()); /** Retrieve all tables for EDM */ DataFrame property = sql.parquetFile(edm.toS3nPath("property")).filter("ISVALID = 1"); property.registerTempTable("p"); DataFrame yb_lookup = sql.parquetFile(model.toS3nPath("yb_lookup")).as("yb"); yb_lookup.registerTempTable("yb"); sql.sql("select * from p left join yb on year(p.YEARBUILT)=yb.yb_class_vdm").count(); ctx.stop(); } If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs without any problem. But after I add the UDF to the query (just as above code), the exception as below: Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899) at com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna bilityEncodeExecutor.java:137) at com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna bilityEncodeExecutor.java:488) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:126) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:125) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 6 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:101) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:49) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 10 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48) at org.apache.spark.sql.execution.joins.HashOuterJoin.execute(HashOuterJoin.sca la:188) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:126) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:125) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 14 more Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal a:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:65) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:49) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 20 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$. getObjFieldValues$extension(SerializationDebugger.scala:240) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi tSerializable(SerializationDebugger.scala:150) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi t(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi tSerializable(SerializationDebugger.scala:158) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi t(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi tSerializable(SerializationDebugger.scala:158) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi t(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi tSerializable(SerializationDebugger.scala:158) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi t(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugge r.scala:58) at org.apache.spark.serializer.SerializationDebugger$.improveException(Serializ ationDebugger.scala:39) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializ er.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer. scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal a:164) ... 26 more Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(Unknown Source) at java.io.ObjectStreamClass.getObjFieldValues(Unknown Source) ... 44 more