We tried to standardize the SQL data source management using the Avro
schema, but encountered some serialization exceptions when trying to use
the data. The interesting part is that we didn't have any problems in
reading the Avro schema JSON file and converting the Avro schema into a
SQL StructType, then use it to create a data frame in subsequent data
source load operation. The problem occurred when later using the data
frame with some lambda functions.
I am a little confuse in the sense that why the lamba function still
complains about Avro Schema Record not serializable even after the the
data frame is already created? Because after the Avro schema is
converted to StructType, which is used by the load function of
DataFrameReader, there shouldn't be any reference to the Avro schema at
all. Any hints and suggestions are highly appreciated.
-- ND
org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at
org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at
determineAnomalies(<console>:138) ... 60 elided Caused by:
java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
Serialization stack: - object not serializable (class:
org.apache.avro.Schema$RecordSchema, value:
{"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]})
- field (class: $iw, name: schemaObj, type: class
org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) - object
(class $iw, $iw@cec4fce) - field (class: $iw, name: $iw, type: class
$iw) - object (class $iw, $iw@493fbce0) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@18a15a09) - field (class: $iw,
name: $iw, type: class $iw) - object (class $iw, $iw@163ad83a) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@7b8c10cc) - field (class: $iw, name: $iw, type: class $iw) - object
(class $iw, $iw@4f51c2ed) - field (class: $iw, name: $iw, type: class
$iw) - object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@411c804f) - field (class: $iw,
name: $iw, type: class $iw) - object (class $iw, $iw@23bbcdc4) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@33733309) - field (class: $iw, name: $iw, type: class $iw) - object
(class $iw, $iw@6332bc47) - field (class: $iw, name: $iw, type: class
$iw) - object (class $iw, $iw@60867362) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@37b75cb5) - field (class: $iw,
name: $iw, type: class $iw) - object (class $iw, $iw@421a5931) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) - object
(class $iw, $iw@4538e79a) - field (class: $iw, name: $iw, type: class
$iw) - object (class $iw, $iw@12be42d3) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@7aabcd55) - field (class: $iw,
name: $iw, type: class $iw) - object (class $iw, $iw@387078b) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@58e180ea) - field (class: $line153036384055.$read, name: $iw, type:
class $iw) - object (class $line153036384055.$read,
$line153036384055.$read@448f7b5b) - field (class: $iw, name:
$line153036384055$read, type: class $line153036384055.$read) - object
(class $iw, $iw@2ef8d075) - field (class: $iw, name: $outer, type: class
$iw) - object (class $iw, $iw@1a3f2f26) - element of array (index: 0) -
array (class [Ljava.lang.Object;, size 2) - field (class:
java.lang.invoke.SerializedLambda, name: capturedArgs, type: class
[Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class $iw,
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
$anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
numCaptured=2]) - writeReplace data (class:
java.lang.invoke.SerializedLambda) - object (class
$Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element of
array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element
of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field
(class: java.lang.invoke.SerializedLambda, name: capturedArgs, type:
class [Ljava.lang.Object;) - object (class
java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class
org.apache.spark.sql.execution.WholeStageCodegenExec,
functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
numCaptured=3]) - writeReplace data (class:
java.lang.invoke.SerializedLambda) - object (class
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129,
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 87 more
- Exception on Avro Schema Object Serialization Artemis User
-