Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean.
I think how Zhang is using will work. Will try & revert. On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini....@gmail.com> wrote: > > You don’t need to have a separate class. I created that as it has lot of > code and logic in my case. > For you to quickly test you can use Zhang’s Scala code in this chain. > Pasting it below for your quick reference: > > ```scala > spark.streams.addListener(new StreamingQueryListener { > override def onQueryProgress(event: > StreamingQueryListener.QueryProgressEvent): > Unit = { > println(event.progress.id + " is on progress") > println(s"My accu is ${myAcc.value} on query progress") > } > ... > }) > > def mappingFunc(key: Long, values: Iterator[String], state: > GroupState[Long]): ... = { > myAcc.add(1) > println(s">>> key: $key => state: ${state}") > ... > } > > val wordCounts = words > .groupByKey(v => ...) > .mapGroupsWithState(timeoutConf = > GroupStateTimeout.ProcessingTimeTimeout)(func > = mappingFunc) > > val query = wordCounts.writeStream > .outputMode(OutputMode.Update) > > > On Mon, Jun 8, 2020 at 11:14 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Great. I guess the trick is to use a separate class such as >> 'StateUpdateTask'. I will try that. My challenge is to convert this into >> Scala. Will try it out & revert. Thanks for the tips. >> >> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote: >> >>> The following Java codes can work in my cluster environment: >>> ``` >>> .mapGroupsWithState((MapGroupsWithStateFunction<String, String, >>> Long, LeadingCharCount>) (key, values, state) -> { >>> myAcc.add(1); >>> <...> >>> state.update(newState); >>> return new LeadingCharCount(key, newState); >>> }, >>> Encoders.LONG(), >>> Encoders.bean(LeadingCharCount.class), >>> GroupStateTimeout.ProcessingTimeTimeout()) >>> ``` >>> >>> Also works fine with my `StateUpdateTask`: >>> ``` >>> .mapGroupsWithState( >>> new StateUpdateTask(myAcc), >>> Encoders.LONG(), >>> Encoders.bean(LeadingCharCount.class), >>> GroupStateTimeout.ProcessingTimeTimeout()); >>> >>> public class StateUpdateTask >>> implements MapGroupsWithStateFunction<String, String, Long, >>> LeadingCharCount> { >>> private LongAccumulator myAccInTask; >>> >>> public StateUpdateTask(LongAccumulator acc) { >>> this.myAccInTask = acc; >>> } >>> >>> @Override >>> public LeadingCharCount call(String key, Iterator<String> >>> values, GroupState<Long> state) throws Exception { >>> myAccInTask.add(1); >>> <...> >>> state.update(newState); >>> return new LeadingCharCount(key, newState); >>> } >>> } >>> ``` >>> >>> -- >>> Cheers, >>> -z >>> >>> On Tue, 2 Jun 2020 10:28:36 +0800 >>> ZHANG Wei <wezh...@outlook.com> wrote: >>> >>> > Yes, verified on the cluster with 5 executors. >>> > >>> > -- >>> > Cheers, >>> > -z >>> > >>> > On Fri, 29 May 2020 11:16:12 -0700 >>> > Something Something <mailinglist...@gmail.com> wrote: >>> > >>> > > Did you try this on the Cluster? Note: This works just fine under >>> 'Local' >>> > > mode. >>> > > >>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com> >>> wrote: >>> > > >>> > > > I can't reproduce the issue with my simple code: >>> > > > ```scala >>> > > > spark.streams.addListener(new StreamingQueryListener { >>> > > > override def onQueryProgress(event: >>> > > > StreamingQueryListener.QueryProgressEvent): Unit = { >>> > > > println(event.progress.id + " is on progress") >>> > > > println(s"My accu is ${myAcc.value} on query progress") >>> > > > } >>> > > > ... >>> > > > }) >>> > > > >>> > > > def mappingFunc(key: Long, values: Iterator[String], state: >>> > > > GroupState[Long]): ... = { >>> > > > myAcc.add(1) >>> > > > println(s">>> key: $key => state: ${state}") >>> > > > ... >>> > > > } >>> > > > >>> > > > val wordCounts = words >>> > > > .groupByKey(v => ...) >>> > > > .mapGroupsWithState(timeoutConf = >>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) >>> > > > >>> > > > val query = wordCounts.writeStream >>> > > > .outputMode(OutputMode.Update) >>> > > > ... >>> > > > ``` >>> > > > >>> > > > I'm wondering if there were any errors can be found from driver >>> logs? The >>> > > > micro-batch >>> > > > exceptions won't terminate the streaming job running. >>> > > > >>> > > > For the following code, we have to make sure that >>> `StateUpdateTask` is >>> > > > started: >>> > > > > .mapGroupsWithState( >>> > > > > new >>> > > > >>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>> > > > > appConfig, accumulators), >>> > > > > Encoders.bean(ModelStateInfo.class), >>> > > > > Encoders.bean(ModelUpdate.class), >>> > > > > >>> GroupStateTimeout.ProcessingTimeTimeout()); >>> > > > >>> > > > -- >>> > > > Cheers, >>> > > > -z >>> > > > >>> > > > On Thu, 28 May 2020 19:59:31 +0530 >>> > > > Srinivas V <srini....@gmail.com> wrote: >>> > > > >>> > > > > Giving the code below: >>> > > > > //accumulators is a class level variable in driver. >>> > > > > >>> > > > > sparkSession.streams().addListener(new StreamingQueryListener() >>> { >>> > > > > @Override >>> > > > > public void onQueryStarted(QueryStartedEvent >>> queryStarted) { >>> > > > > logger.info("Query started: " + >>> queryStarted.id()); >>> > > > > } >>> > > > > @Override >>> > > > > public void onQueryTerminated(QueryTerminatedEvent >>> > > > > queryTerminated) { >>> > > > > logger.info("Query terminated: " + >>> > > > queryTerminated.id()); >>> > > > > } >>> > > > > @Override >>> > > > > public void onQueryProgress(QueryProgressEvent >>> > > > queryProgress) { >>> > > > > >>> > > > > >>> accumulators.eventsReceived(queryProgress.progress().numInputRows()); >>> > > > > long eventsReceived = 0; >>> > > > > long eventsExpired = 0; >>> > > > > long eventSentSuccess = 0; >>> > > > > try { >>> > > > > eventsReceived = >>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED); >>> > > > > eventsExpired = >>> > > > > >>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED); >>> > > > > eventSentSuccess = >>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT); >>> > > > > } catch (MissingKeyException e) { >>> > > > > logger.error("Accumulator key not found due >>> to >>> > > > > Exception {}", e.getMessage()); >>> > > > > } >>> > > > > logger.info("Events Received:{}", >>> eventsReceived); >>> > > > > logger.info("Events State Expired:{}", >>> eventsExpired); >>> > > > > logger.info("Events Sent Success:{}", >>> eventSentSuccess); >>> > > > > logger.info("Query made progress - batchId: {} >>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{} >>> > > > > durationMs:{}" , >>> > > > > queryProgress.progress().batchId(), >>> > > > > queryProgress.progress().numInputRows(), >>> > > > > queryProgress.progress().inputRowsPerSecond(), >>> > > > > >>> > > > queryProgress.progress().processedRowsPerSecond(), >>> > > > > queryProgress.progress().durationMs()); >>> > > > > >>> > > > > >>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com> >>> wrote: >>> > > > > >>> > > > > > May I get how the accumulator is accessed in the method >>> > > > > > `onQueryProgress()`? >>> > > > > > >>> > > > > > AFAICT, the accumulator is incremented well. There is a way to >>> verify >>> > > > that >>> > > > > > in cluster like this: >>> > > > > > ``` >>> > > > > > // Add the following while loop before invoking >>> awaitTermination >>> > > > > > while (true) { >>> > > > > > println("My acc: " + myAcc.value) >>> > > > > > Thread.sleep(5 * 1000) >>> > > > > > } >>> > > > > > >>> > > > > > //query.awaitTermination() >>> > > > > > ``` >>> > > > > > >>> > > > > > And the accumulator value updated can be found from driver >>> stdout. >>> > > > > > >>> > > > > > -- >>> > > > > > Cheers, >>> > > > > > -z >>> > > > > > >>> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >>> > > > > > Srinivas V <srini....@gmail.com> wrote: >>> > > > > > >>> > > > > > > yes, I am using stateful structured streaming. Yes similar >>> to what >>> > > > you >>> > > > > > do. >>> > > > > > > This is in Java >>> > > > > > > I do it this way: >>> > > > > > > Dataset<ModelUpdate> productUpdates = watermarkedDS >>> > > > > > > .groupByKey( >>> > > > > > > (MapFunction<InputEventModel, >>> String>) event >>> > > > -> >>> > > > > > > event.getId(), Encoders.STRING()) >>> > > > > > > .mapGroupsWithState( >>> > > > > > > new >>> > > > > > > >>> > > > > > >>> > > > >>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>> > > > > > > appConfig, accumulators), >>> > > > > > > Encoders.bean(ModelStateInfo.class), >>> > > > > > > Encoders.bean(ModelUpdate.class), >>> > > > > > > >>> GroupStateTimeout.ProcessingTimeTimeout()); >>> > > > > > > >>> > > > > > > StateUpdateTask contains the update method. >>> > > > > > > >>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>> > > > > > > mailinglist...@gmail.com> wrote: >>> > > > > > > >>> > > > > > > > Yes, that's exactly how I am creating them. >>> > > > > > > > >>> > > > > > > > Question... Are you using 'Stateful Structured Streaming' >>> in which >>> > > > > > you've >>> > > > > > > > something like this? >>> > > > > > > > >>> > > > > > > > >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> > > > > > > > updateAcrossEvents >>> > > > > > > > ) >>> > > > > > > > >>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? >>> We're >>> > > > > > experiencing this only under 'Stateful Structured Streaming'. >>> In other >>> > > > > > streaming applications it works as expected. >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>> srini....@gmail.com> >>> > > > > > wrote: >>> > > > > > > > >>> > > > > > > >> Yes, I am talking about Application specific Accumulators. >>> > > > Actually I >>> > > > > > am >>> > > > > > > >> getting the values printed in my driver log as well as >>> sent to >>> > > > > > Grafana. Not >>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>> “client” on >>> > > > a >>> > > > > > yarn >>> > > > > > > >> cluster(not local Mac) where I submit from master node. >>> It should >>> > > > > > work the >>> > > > > > > >> same for cluster mode as well. >>> > > > > > > >> Create accumulators like this: >>> > > > > > > >> AccumulatorV2 accumulator = >>> sparkContext.longAccumulator(name); >>> > > > > > > >> >>> > > > > > > >> >>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < >>> > > > > > > >> mailinglist...@gmail.com> wrote: >>> > > > > > > >> >>> > > > > > > >>> Hmm... how would they go to Graphana if they are not >>> getting >>> > > > > > computed in >>> > > > > > > >>> your code? I am talking about the Application Specific >>> > > > Accumulators. >>> > > > > > The >>> > > > > > > >>> other standard counters such as >>> > > > 'event.progress.inputRowsPerSecond' >>> > > > > > are >>> > > > > > > >>> getting populated correctly! >>> > > > > > > >>> >>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < >>> srini....@gmail.com> >>> > > > > > wrote: >>> > > > > > > >>> >>> > > > > > > >>>> Hello, >>> > > > > > > >>>> Even for me it comes as 0 when I print in >>> OnQueryProgress. I use >>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but >>> not on >>> > > > > > cluster. >>> > > > > > > >>>> But one consolation is that when I send metrics to >>> Graphana, the >>> > > > > > values >>> > > > > > > >>>> are coming there. >>> > > > > > > >>>> >>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>> > > > > > > >>>> mailinglist...@gmail.com> wrote: >>> > > > > > > >>>> >>> > > > > > > >>>>> No this is not working even if I use LongAccumulator. >>> > > > > > > >>>>> >>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < >>> wezh...@outlook.com >>> > > > > >>> > > > > > wrote: >>> > > > > > > >>>>> >>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the >>> OUT type >>> > > > > > should >>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the >>> implementation >>> > > > for >>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there >>> any >>> > > > chance >>> > > > > > to replace >>> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] >>> or >>> > > > > > LongAccumulator[3] >>> > > > > > > >>>>>> and test if the StreamingListener and other codes are >>> able to >>> > > > > > work? >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> --- >>> > > > > > > >>>>>> Cheers, >>> > > > > > > >>>>>> -z >>> > > > > > > >>>>>> [1] >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&reserved=0 >>> > > > > > > >>>>>> [2] >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&reserved=0 >>> > > > > > > >>>>>> [3] >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&reserved=0 >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> ________________________________________ >>> > > > > > > >>>>>> From: Something Something <mailinglist...@gmail.com> >>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 >>> > > > > > > >>>>>> To: spark-user >>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured >>> > > > Streaming >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> Can someone from Spark Development team tell me if >>> this >>> > > > > > functionality >>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on >>> this but >>> > > > > > can't get it >>> > > > > > > >>>>>> to work. Just to add more context, we've our own >>> Accumulator >>> > > > > > class that >>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep >>> track of >>> > > > one or >>> > > > > > more >>> > > > > > > >>>>>> accumulators. Here's the definition: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> class CollectionLongAccumulator[T] >>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> When the job begins we register an instance of this >>> class: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, >>> "MyAccumulator") >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> Is this working under Structured Streaming? >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> I will keep looking for alternate approaches but any >>> help >>> > > > would be >>> > > > > > > >>>>>> greatly appreciated. Thanks. >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>> > > > > > > >>>>>> mailinglist...@gmail.com<mailto: >>> mailinglist...@gmail.com>> >>> > > > wrote: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> In my structured streaming job I am updating Spark >>> > > > Accumulators in >>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0 >>> when I >>> > > > try to >>> > > > > > print >>> > > > > > > >>>>>> them in my StreamingListener. Here's the code: >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> > > > > > > >>>>>> updateAcrossEvents >>> > > > > > > >>>>>> ) >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> The accumulators get incremented in >>> 'updateAcrossEvents'. >>> > > > I've a >>> > > > > > > >>>>>> StreamingListener which writes values of the >>> accumulators in >>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the >>> Accumulators >>> > > > are >>> > > > > > ALWAYS >>> > > > > > > >>>>>> ZERO! >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> When I added log statements in the >>> updateAcrossEvents, I >>> > > > could see >>> > > > > > > >>>>>> that these accumulators are getting incremented as >>> expected. >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. >>> In Local >>> > > > mode >>> > > > > > it >>> > > > > > > >>>>>> works fine which implies that the Accumulators are >>> not getting >>> > > > > > distributed >>> > > > > > > >>>>>> correctly - or something like that! >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that >>> tell me to >>> > > > > > > >>>>>> perform an "Action". That's not a solution here. This >>> is a >>> > > > > > 'Stateful >>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also >>> 'registering' them >>> > > > in >>> > > > > > > >>>>>> SparkContext. >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > > >>>>>> >>> > > > > > >>> > > > >>> > >>> > --------------------------------------------------------------------- >>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> > >>> >>