I want to use t-digest with foreachPartition and accumulators (essentially, create a t-digest per partition and add that to the accumulator leveraging the fact that t-digests can be added to each other). I can make t-digests kryo-serializable easily but java-serializable is not very easy. Now, when running it (1.4.1), I get this error:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:877) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:876) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:876) at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1255) which makes sense because the suggested way to use accumulators is via closures. But since in my case I can't easily make the value type java-serializable, that won't work. Is there another way to pass the accumulator to the tasks that doesn't involve closures and hence java serialization ?