Yes, that's exactly how I am creating them. Question... Are you using 'Stateful Structured Streaming' in which you've something like this?
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected. On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote: > Yes, I am talking about Application specific Accumulators. Actually I am > getting the values printed in my driver log as well as sent to Grafana. Not > sure where and when I saw 0 before. My deploy mode is “client” on a yarn > cluster(not local Mac) where I submit from master node. It should work the > same for cluster mode as well. > Create accumulators like this: > AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > On Tue, May 26, 2020 at 8:42 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Hmm... how would they go to Graphana if they are not getting computed in >> your code? I am talking about the Application Specific Accumulators. The >> other standard counters such as 'event.progress.inputRowsPerSecond' are >> getting populated correctly! >> >> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote: >> >>> Hello, >>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>> But one consolation is that when I send metrics to Graphana, the values >>> are coming there. >>> >>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> No this is not working even if I use LongAccumulator. >>>> >>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>>> >>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>> replace >>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>> LongAccumulator[3] >>>>> and test if the StreamingListener and other codes are able to work? >>>>> >>>>> --- >>>>> Cheers, >>>>> -z >>>>> [1] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>> [2] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>> [3] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>> >>>>> ________________________________________ >>>>> From: Something Something <mailinglist...@gmail.com> >>>>> Sent: Saturday, May 16, 2020 0:38 >>>>> To: spark-user >>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>> >>>>> Can someone from Spark Development team tell me if this functionality >>>>> is supported and tested? I've spent a lot of time on this but can't get it >>>>> to work. Just to add more context, we've our own Accumulator class that >>>>> extends from AccumulatorV2. In this class we keep track of one or more >>>>> accumulators. Here's the definition: >>>>> >>>>> >>>>> class CollectionLongAccumulator[T] >>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>> >>>>> When the job begins we register an instance of this class: >>>>> >>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>> >>>>> Is this working under Structured Streaming? >>>>> >>>>> I will keep looking for alternate approaches but any help would be >>>>> greatly appreciated. Thanks. >>>>> >>>>> >>>>> >>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>> >>>>> In my structured streaming job I am updating Spark Accumulators in the >>>>> updateAcrossEvents method but they are always 0 when I try to print them >>>>> in >>>>> my StreamingListener. Here's the code: >>>>> >>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>> updateAcrossEvents >>>>> ) >>>>> >>>>> >>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>> StreamingListener which writes values of the accumulators in >>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>>> ZERO! >>>>> >>>>> When I added log statements in the updateAcrossEvents, I could see >>>>> that these accumulators are getting incremented as expected. >>>>> >>>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>>> works fine which implies that the Accumulators are not getting distributed >>>>> correctly - or something like that! >>>>> >>>>> Note: I've seen quite a few answers on the Web that tell me to perform >>>>> an "Action". That's not a solution here. This is a 'Stateful Structured >>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>>>> >>>>> >>>>> >>>>>