Hello, I have a few spark jobs that are doing the same aggregations. I want to factorize the aggregation logic. For that I want to use a Trait. When I run this job extending my Trait (over yarn, in client mode), I get a NotSerializableException (in attachment). If I change my Trait to an Object, the job runs fine and I don't have a NotSerializableException.
Could you explain me why ? I don't understand this behavior Thnaks Jean ------------------ object SparkJob extends App { val conf = new SparkConf() val sparkSession: SparkSession = SparkSession.builder() .appName("aggregateAdTechImpressions") .config(conf) .enableHiveSupport() .getOrCreate() ... val impressionsAdtechDF = MyUtil.prepareAggregationDataFrame(impressionsAdtechRawDF, "timestamp") val impressionsAggregationDF: DataFrame = MyUtil.aggregateImpressions(impressionsAdtechDF) ... } object MyUtil { private def parseTs(ts: Int): Int = { val tsMilli: Long = ts.toLong * 1000L val date: Date = new Date(tsMilli) val dateFormat = new SimpleDateFormat("yyyyMMdd") val dateStr = dateFormat.format(date) if (dateStr == null) 19000101 else dateStr.toInt } private def udfParseTs: UserDefinedFunction = udf(parseTs _) def prepareAggregationDataFrame(rawDF: DataFrame, timestampColumnName: String): DataFrame = { rawDF .withColumn("original_placement_id", col("placementid")) .withColumn("date", udfParseTs(col(timestampColumnName))) .withColumn("placement_id", col("placementid") cast StringType) .withColumnRenamed("campaignid", "campaign_id") .withColumnRenamed("placementSizeTypeId", "size_id") .drop("placementid") .drop(timestampColumnName) } def aggregateImpressions(inputDF: DataFrame): DataFrame = { inputDF.groupBy( col("date"), col("campaign_id"), col("original_placement_id"), col("placement_id"), col("size_id")) .agg(count(lit(1)).alias("cnt")) .withColumn("type", lit(1)) .withColumn("revenue_chf", lit(0) cast DoubleType) .withColumn("revenue_eur", lit(0) cast DoubleType) .withColumn("source", lit(0)) // 0 for AdTech } } ---- object SparkJob2 extends App with MyTrait { val conf = new SparkConf() val sparkSession: SparkSession = SparkSession.builder() .appName("aggregateAdTechImpressions") .config(conf) .enableHiveSupport() .getOrCreate() ... val impressionsAdtechDF = prepareAggregationDataFrame(impressionsAdtechRawDF, "timestamp") val impressionsAggregationDF: DataFrame = aggregateImpressions(impressionsAdtechDF) ... } trait MyTrait { private def parseTs(ts: Int): Int = { val tsMilli: Long = ts.toLong * 1000L val date: Date = new Date(tsMilli) val dateFormat = new SimpleDateFormat("yyyyMMdd") val dateStr = dateFormat.format(date) if (dateStr == null) 19000101 else dateStr.toInt } private def udfParseTs: UserDefinedFunction = udf(parseTs _) def prepareAggregationDataFrame(rawDF: DataFrame, timestampColumnName: String): DataFrame = { rawDF .withColumn("original_placement_id", col("placementid")) .withColumn("date", udfParseTs(col(timestampColumnName))) .withColumn("placement_id", col("placementid") cast StringType) .withColumnRenamed("campaignid", "campaign_id") .withColumnRenamed("placementSizeTypeId", "size_id") .drop("placementid") .drop(timestampColumnName) } def aggregateImpressions(inputDF: DataFrame): DataFrame = { inputDF.groupBy( col("date"), col("campaign_id"), col("original_placement_id"), col("placement_id"), col("size_id")) .agg(count(lit(1)).alias("cnt")) .withColumn("type", lit(1)) .withColumn("revenue_chf", lit(0) cast DoubleType) .withColumn("revenue_eur", lit(0) cast DoubleType) .withColumn("source", lit(0)) // 0 for AdTech } }
spark-NotSerializableException.log
Description: Binary data
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org