Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well. Create accumulators like this: AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
On Tue, May 26, 2020 at 8:42 PM Something Something < mailinglist...@gmail.com> wrote: > Hmm... how would they go to Graphana if they are not getting computed in > your code? I am talking about the Application Specific Accumulators. The > other standard counters such as 'event.progress.inputRowsPerSecond' are > getting populated correctly! > > On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote: > >> Hello, >> Even for me it comes as 0 when I print in OnQueryProgress. I use >> LongAccumulator as well. Yes, it prints on my local but not on cluster. >> But one consolation is that when I send metrics to Graphana, the values >> are coming there. >> >> On Tue, May 26, 2020 at 3:10 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> No this is not working even if I use LongAccumulator. >>> >>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>> >>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be >>>> atomic or thread safe. I'm wondering if the implementation for >>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >>>> and test if the StreamingListener and other codes are able to work? >>>> >>>> --- >>>> Cheers, >>>> -z >>>> [1] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>> [2] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>> [3] >>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>> >>>> ________________________________________ >>>> From: Something Something <mailinglist...@gmail.com> >>>> Sent: Saturday, May 16, 2020 0:38 >>>> To: spark-user >>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>> >>>> Can someone from Spark Development team tell me if this functionality >>>> is supported and tested? I've spent a lot of time on this but can't get it >>>> to work. Just to add more context, we've our own Accumulator class that >>>> extends from AccumulatorV2. In this class we keep track of one or more >>>> accumulators. Here's the definition: >>>> >>>> >>>> class CollectionLongAccumulator[T] >>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>> >>>> When the job begins we register an instance of this class: >>>> >>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>> >>>> Is this working under Structured Streaming? >>>> >>>> I will keep looking for alternate approaches but any help would be >>>> greatly appreciated. Thanks. >>>> >>>> >>>> >>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>> >>>> In my structured streaming job I am updating Spark Accumulators in the >>>> updateAcrossEvents method but they are always 0 when I try to print them in >>>> my StreamingListener. Here's the code: >>>> >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> updateAcrossEvents >>>> ) >>>> >>>> >>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>> StreamingListener which writes values of the accumulators in >>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>> ZERO! >>>> >>>> When I added log statements in the updateAcrossEvents, I could see that >>>> these accumulators are getting incremented as expected. >>>> >>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>> works fine which implies that the Accumulators are not getting distributed >>>> correctly - or something like that! >>>> >>>> Note: I've seen quite a few answers on the Web that tell me to perform >>>> an "Action". That's not a solution here. This is a 'Stateful Structured >>>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>>> >>>> >>>> >>>>