May I get how the accumulator is accessed in the method `onQueryProgress()`?
AFAICT, the accumulator is incremented well. There is a way to verify that in cluster like this: ``` // Add the following while loop before invoking awaitTermination while (true) { println("My acc: " + myAcc.value) Thread.sleep(5 * 1000) } //query.awaitTermination() ``` And the accumulator value updated can be found from driver stdout. -- Cheers, -z On Thu, 28 May 2020 17:12:48 +0530 Srinivas V <srini....@gmail.com> wrote: > yes, I am using stateful structured streaming. Yes similar to what you do. > This is in Java > I do it this way: > Dataset<ModelUpdate> productUpdates = watermarkedDS > .groupByKey( > (MapFunction<InputEventModel, String>) event -> > event.getId(), Encoders.STRING()) > .mapGroupsWithState( > new > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > appConfig, accumulators), > Encoders.bean(ModelStateInfo.class), > Encoders.bean(ModelUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > StateUpdateTask contains the update method. > > On Thu, May 28, 2020 at 4:41 AM Something Something < > mailinglist...@gmail.com> wrote: > > > Yes, that's exactly how I am creating them. > > > > Question... Are you using 'Stateful Structured Streaming' in which you've > > something like this? > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > updateAcrossEvents > > ) > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're > > experiencing this only under 'Stateful Structured Streaming'. In other > > streaming applications it works as expected. > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote: > > > >> Yes, I am talking about Application specific Accumulators. Actually I am > >> getting the values printed in my driver log as well as sent to Grafana. Not > >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn > >> cluster(not local Mac) where I submit from master node. It should work the > >> same for cluster mode as well. > >> Create accumulators like this: > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > >> > >> > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > >> mailinglist...@gmail.com> wrote: > >> > >>> Hmm... how would they go to Graphana if they are not getting computed in > >>> your code? I am talking about the Application Specific Accumulators. The > >>> other standard counters such as 'event.progress.inputRowsPerSecond' are > >>> getting populated correctly! > >>> > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote: > >>> > >>>> Hello, > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. > >>>> But one consolation is that when I send metrics to Graphana, the values > >>>> are coming there. > >>>> > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > >>>> mailinglist...@gmail.com> wrote: > >>>> > >>>>> No this is not working even if I use LongAccumulator. > >>>>> > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezh...@outlook.com> wrote: > >>>>> > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should > >>>>>> be atomic or thread safe. I'm wondering if the implementation for > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to > >>>>>> replace > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > >>>>>> LongAccumulator[3] > >>>>>> and test if the StreamingListener and other codes are able to work? > >>>>>> > >>>>>> --- > >>>>>> Cheers, > >>>>>> -z > >>>>>> [1] > >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&reserved=0 > >>>>>> [2] > >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&reserved=0 > >>>>>> [3] > >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&reserved=0 > >>>>>> > >>>>>> ________________________________________ > >>>>>> From: Something Something <mailinglist...@gmail.com> > >>>>>> Sent: Saturday, May 16, 2020 0:38 > >>>>>> To: spark-user > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming > >>>>>> > >>>>>> Can someone from Spark Development team tell me if this functionality > >>>>>> is supported and tested? I've spent a lot of time on this but can't > >>>>>> get it > >>>>>> to work. Just to add more context, we've our own Accumulator class that > >>>>>> extends from AccumulatorV2. In this class we keep track of one or more > >>>>>> accumulators. Here's the definition: > >>>>>> > >>>>>> > >>>>>> class CollectionLongAccumulator[T] > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] > >>>>>> > >>>>>> When the job begins we register an instance of this class: > >>>>>> > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") > >>>>>> > >>>>>> Is this working under Structured Streaming? > >>>>>> > >>>>>> I will keep looking for alternate approaches but any help would be > >>>>>> greatly appreciated. Thanks. > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: > >>>>>> > >>>>>> In my structured streaming job I am updating Spark Accumulators in > >>>>>> the updateAcrossEvents method but they are always 0 when I try to print > >>>>>> them in my StreamingListener. Here's the code: > >>>>>> > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > >>>>>> updateAcrossEvents > >>>>>> ) > >>>>>> > >>>>>> > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a > >>>>>> StreamingListener which writes values of the accumulators in > >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS > >>>>>> ZERO! > >>>>>> > >>>>>> When I added log statements in the updateAcrossEvents, I could see > >>>>>> that these accumulators are getting incremented as expected. > >>>>>> > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it > >>>>>> works fine which implies that the Accumulators are not getting > >>>>>> distributed > >>>>>> correctly - or something like that! > >>>>>> > >>>>>> Note: I've seen quite a few answers on the Web that tell me to > >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in > >>>>>> SparkContext. > >>>>>> > >>>>>> > >>>>>> > >>>>>> --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org