We tried to standardize the SQL data source management using the Avro schema, but encountered some serialization exceptions when trying to use the data.  The interesting part is that we didn't have any problems in reading the Avro schema JSON file and converting the Avro schema into a SQL StructType, then use it to create a data frame in subsequent data source load operation.  The problem occurred when later using the data frame with some lambda functions.

I am a little confuse in the sense that why the lamba function still complains about Avro Schema Record not serializable even after the the data frame is already created?  Because after the Avro schema is converted to StructType, which is used by the load function of DataFrameReader, there shouldn't be any reference to the Avro schema at all.  Any hints and suggestions are highly appreciated.

-- ND

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at determineAnomalies(<console>:138) ... 60 elided Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema Serialization stack: - object not serializable (class: org.apache.avro.Schema$RecordSchema, value: {"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]}) - field (class: $iw, name: schemaObj, type: class org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@cec4fce) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@493fbce0) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7b8c10cc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4f51c2ed) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@411c804f) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@33733309) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@6332bc47) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@60867362) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@37b75cb5) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@421a5931) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@12be42d3) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7aabcd55) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@387078b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@58e180ea) - field (class: $line153036384055.$read, name: $iw, type: class $iw) - object (class $line153036384055.$read, $line153036384055.$read@448f7b5b) - field (class: $iw, name: $line153036384055$read, type: class $line153036384055.$read) - object (class $iw, $iw@2ef8d075) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw@1a3f2f26) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 2) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=2]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413) ... 87 more

Reply via email to