Hello,

I am getting the famous serialization exception on running some code as
below,

val correctColNameUDF = udf(getNewColumnName(_: String, false:
Boolean): String);
val charReference: DataFrame = thinLong.select("char_name_id",
"char_name").withColumn("columnNameInDimTable",
correctColNameUDF(col("char_name"))).withColumn("applicable_dimension",
lit(dimension).cast(StringType)).distinct();
val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
val tableName: String = charReferenceTableName.toString
charReference.saveAsTable(tableName, saveMode)

I think it has something to do with the UDF, so I am pasting the UDF
function as well

def getNewColumnName(oldColName: String, appendID: Boolean): String = {
  var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%",
"_pct").replaceAllLiterally("#", "No")
  return newColName;
}


*​Exception *seen ​is

Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 73 more
Caused by: java.io.NotSerializableException:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$
Serialization stack:
- object not serializable (class:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$,
value:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411)
- field (class:
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1,
name: $outer, type: interface
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
- object (class
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1,
<function1>)
- field (class:
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name:
func$2, type: interface scala.Function1)
- object (class
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name:
f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
UDF(char_name#3))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name:
child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias,
UDF(char_name#3) AS columnNameInDimTable#304)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 4)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS
columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
- field (class: org.apache.spark.sql.execution.Project, name: projectList,
type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project
[char_name_id#2,char_name#3,UDF(char_name#3) AS
columnNameInDimTable#304,PRODUCT AS applicable_dimension#305]



-- 
Sincerely,
Darshan

Reply via email to