One more option is to override writeReplace [1] in
LegacyAccumulatorWrapper to prevent such failures.

What do you think?

[1] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L158

On Fri, Mar 16, 2018 at 12:55 AM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote:
> Hi there,
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark
> 2.x failing with
> java.lang.AssertionError: assertion failed: copyAndReset must return a
> zero value copy
>
> It happens while serializing an accumulator here [1] although
> copyAndReset returns zero-value copy for sure, just consider the
> accumulator below
>
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder
> = new jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder):
> jl.StringBuilder = r1.append(r2)
> }
>
> So, Spark treats zero value as non-zero due to how isZero [2] is
> implemented in LegacyAccumulatorWrapper
>
> override def isZero: Boolean = _value == param.zero(initialValue)
>
> All this means, that the values to be accumulated must implement
> equals and hashCode, otherwise isZero is very likely to always return
> false.
>
> So I'm wondering why this assertion is necessary and whether it can be
> safely removed from there?
>
> [1] 
> https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165
> [2] 
> https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489
> [3] https://issues.apache.org/jira/browse/SPARK-23697

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to