One more option is to override writeReplace [1] in LegacyAccumulatorWrapper to prevent such failures.
What do you think? [1] https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L158 On Fri, Mar 16, 2018 at 12:55 AM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > Hi there, > > I've noticed that accumulators of Spark 1.x no longer work with Spark > 2.x failing with > java.lang.AssertionError: assertion failed: copyAndReset must return a > zero value copy > > It happens while serializing an accumulator here [1] although > copyAndReset returns zero-value copy for sure, just consider the > accumulator below > > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder > = new jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > } > > So, Spark treats zero value as non-zero due to how isZero [2] is > implemented in LegacyAccumulatorWrapper > > override def isZero: Boolean = _value == param.zero(initialValue) > > All this means, that the values to be accumulated must implement > equals and hashCode, otherwise isZero is very likely to always return > false. > > So I'm wondering why this assertion is necessary and whether it can be > safely removed from there? > > [1] > https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165 > [2] > https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489 > [3] https://issues.apache.org/jira/browse/SPARK-23697 --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org