Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!
On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote: > Hello, > Even for me it comes as 0 when I print in OnQueryProgress. I use > LongAccumulator as well. Yes, it prints on my local but not on cluster. > But one consolation is that when I send metrics to Graphana, the values > are coming there. > > On Tue, May 26, 2020 at 3:10 AM Something Something < > mailinglist...@gmail.com> wrote: > >> No this is not working even if I use LongAccumulator. >> >> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >> >>> There is a restriction in AccumulatorV2 API [1], the OUT type should be >>> atomic or thread safe. I'm wondering if the implementation for >>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >>> and test if the StreamingListener and other codes are able to work? >>> >>> --- >>> Cheers, >>> -z >>> [1] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>> [2] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>> [3] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>> >>> ________________________________________ >>> From: Something Something <mailinglist...@gmail.com> >>> Sent: Saturday, May 16, 2020 0:38 >>> To: spark-user >>> Subject: Re: Using Spark Accumulators with Structured Streaming >>> >>> Can someone from Spark Development team tell me if this functionality is >>> supported and tested? I've spent a lot of time on this but can't get it to >>> work. Just to add more context, we've our own Accumulator class that >>> extends from AccumulatorV2. In this class we keep track of one or more >>> accumulators. Here's the definition: >>> >>> >>> class CollectionLongAccumulator[T] >>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>> >>> When the job begins we register an instance of this class: >>> >>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>> >>> Is this working under Structured Streaming? >>> >>> I will keep looking for alternate approaches but any help would be >>> greatly appreciated. Thanks. >>> >>> >>> >>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>> >>> In my structured streaming job I am updating Spark Accumulators in the >>> updateAcrossEvents method but they are always 0 when I try to print them in >>> my StreamingListener. Here's the code: >>> >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> updateAcrossEvents >>> ) >>> >>> >>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>> StreamingListener which writes values of the accumulators in >>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>> ZERO! >>> >>> When I added log statements in the updateAcrossEvents, I could see that >>> these accumulators are getting incremented as expected. >>> >>> This only happens when I run in the 'Cluster' mode. In Local mode it >>> works fine which implies that the Accumulators are not getting distributed >>> correctly - or something like that! >>> >>> Note: I've seen quite a few answers on the Web that tell me to perform >>> an "Action". That's not a solution here. This is a 'Stateful Structured >>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>> >>> >>> >>>