I want to use t-digest with foreachPartition and accumulators (essentially,
create a t-digest per partition and add that to the accumulator leveraging
the fact that t-digests can be added to each other). I can make t-digests
kryo-serializable easily but java-serializable is not very easy.
Now, when running it (1.4.1), I get this error:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:877)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:876)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:876)
at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1255)

which makes sense because the suggested way to use accumulators is via
closures. But since in my case I can't easily make the value type
java-serializable, that won't work. Is there another way to pass the
accumulator to the tasks that doesn't involve closures and hence java
serialization ?

Reply via email to