Hello, Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster. But one consolation is that when I send metrics to Graphana, the values are coming there.
On Tue, May 26, 2020 at 3:10 AM Something Something < mailinglist...@gmail.com> wrote: > No this is not working even if I use LongAccumulator. > > On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: > >> There is a restriction in AccumulatorV2 API [1], the OUT type should be >> atomic or thread safe. I'm wondering if the implementation for >> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >> and test if the StreamingListener and other codes are able to work? >> >> --- >> Cheers, >> -z >> [1] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >> [2] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >> [3] >> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >> >> ________________________________________ >> From: Something Something <mailinglist...@gmail.com> >> Sent: Saturday, May 16, 2020 0:38 >> To: spark-user >> Subject: Re: Using Spark Accumulators with Structured Streaming >> >> Can someone from Spark Development team tell me if this functionality is >> supported and tested? I've spent a lot of time on this but can't get it to >> work. Just to add more context, we've our own Accumulator class that >> extends from AccumulatorV2. In this class we keep track of one or more >> accumulators. Here's the definition: >> >> >> class CollectionLongAccumulator[T] >> extends AccumulatorV2[T, java.util.Map[T, Long]] >> >> When the job begins we register an instance of this class: >> >> spark.sparkContext.register(myAccumulator, "MyAccumulator") >> >> Is this working under Structured Streaming? >> >> I will keep looking for alternate approaches but any help would be >> greatly appreciated. Thanks. >> >> >> >> On Thu, May 14, 2020 at 2:36 PM Something Something < >> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >> >> In my structured streaming job I am updating Spark Accumulators in the >> updateAcrossEvents method but they are always 0 when I try to print them in >> my StreamingListener. Here's the code: >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> updateAcrossEvents >> ) >> >> >> The accumulators get incremented in 'updateAcrossEvents'. I've a >> StreamingListener which writes values of the accumulators in >> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >> ZERO! >> >> When I added log statements in the updateAcrossEvents, I could see that >> these accumulators are getting incremented as expected. >> >> This only happens when I run in the 'Cluster' mode. In Local mode it >> works fine which implies that the Accumulators are not getting distributed >> correctly - or something like that! >> >> Note: I've seen quite a few answers on the Web that tell me to perform an >> "Action". That's not a solution here. This is a 'Stateful Structured >> Streaming' job. Yes, I am also 'registering' them in SparkContext. >> >> >> >>