with the following simple code
val a = sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)] val grouped = a.groupByKey({x:(Int,Int)=>x._1}) val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)}) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=>{ val simpley = yyy.value 1 }) I'm seeing error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2053) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.map(RDD.scala:365) ... 56 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical Plan == 'AppendColumns <function1>, unresolveddeserializer(newInstance(class scala.Tuple2)), [input[0, int, true] AS value#210] +- LogicalRDD [_1#201, _2#202] == Analyzed Logical Plan == _1: int, _2: int, value: int AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int, true] AS value#210] +- LogicalRDD [_1#201, _2#202] == Optimized Logical Plan == AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int, true] AS value#210] +- LogicalRDD [_1#201, _2#202] == Physical Plan == AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int, true] AS value#210] +- Scan ExistingRDD[_1#201,_2#202]) - field (class: org.apache.spark.sql.KeyValueGroupedDataset, name: queryExecution, type: class org.apache.spark.sql.execution.QueryExecution) - object (class org.apache.spark.sql.KeyValueGroupedDataset, org.apache.spark.sql.KeyValueGroupedDataset@71148f10) - field (class: $iw, name: grouped, type: class org.apache.spark.sql.KeyValueGroupedDataset) - object (class $iw, $iw@7b1c13e4) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@3e9a0c21) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@218cc682) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@2ecedd08) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@79efd402) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@d81976c) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@2d5d6e2a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@74dc6a7a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@5e220d85) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@1c790a4f) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@1d954b06) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@1343c904) - field (class: $line115.$read, name: $iw, type: class $iw) - object (class $line115.$read, $line115.$read@42497908) - field (class: $iw, name: $line115$read, type: class $line115.$read) - object (class $iw, $iw@af36da5) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw@2fd5b99a) - field (class: $anonfun$1, name: $outer, type: class $iw) - object (class $anonfun$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 65 more