You define "getNewColumnName" as method, which requires the class/object 
holding it has to be serializable.

>From the stack trace, it looks like this method defined in 
>ProductDimensionSFFConverterRealApp, but it is not serializable.


In fact, your method only uses String and Boolean, which are serializable by 
default. So you can change the definition to function, instead of method, which 
should work.


Yong


________________________________
From: Darshan Pandya <darshanpan...@gmail.com>
Sent: Friday, February 17, 2017 10:36 PM
To: user
Subject: Serialization error - sql UDF related

Hello,

I am getting the famous serialization exception on running some code as below,


val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): 
String);
val charReference: DataFrame = thinLong.select("char_name_id", 
"char_name").withColumn("columnNameInDimTable", 
correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", 
lit(dimension).cast(StringType)).distinct();
val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
val tableName: String = charReferenceTableName.toString
charReference.saveAsTable(tableName, saveMode)

I think it has something to do with the UDF, so I am pasting the UDF function 
as well


def getNewColumnName(oldColName: String, appendID: Boolean): String = {
  var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", 
"_pct").replaceAllLiterally("#", "No")
  return newColName;
}

Exception seen is

Caused by: org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 73 more
Caused by: java.io.NotSerializableException: 
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$
Serialization stack:
- object not serializable (class: 
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$, 
value: 
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411)
- field (class: 
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, 
name: $outer, type: interface 
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
- object (class 
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, 
<function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, 
name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, 
<function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, 
type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, 
UDF(char_name#3))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, 
type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, 
UDF(char_name#3) AS columnNameInDimTable#304)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 4)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class 
[Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, 
ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS 
columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
- field (class: org.apache.spark.sql.execution.Project, name: projectList, 
type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project 
[char_name_id#2,char_name#3,UDF(char_name#3) AS 
columnNameInDimTable#304,PRODUCT AS applicable_dimension#305]



--
Sincerely,
Darshan

Reply via email to