There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?
--- Cheers, -z [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 [2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator [3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator ________________________________________ From: Something Something <mailinglist...@gmail.com> Sent: Saturday, May 16, 2020 0:38 To: spark-user Subject: Re: Using Spark Accumulators with Structured Streaming Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition: class CollectionLongAccumulator[T] extends AccumulatorV2[T, java.util.Map[T, Long]] When the job begins we register an instance of this class: spark.sparkContext.register(myAccumulator, "MyAccumulator") Is this working under Structured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something <mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO! When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org