Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.
On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote: > The following Java codes can work in my cluster environment: > ``` > .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, > LeadingCharCount>) (key, values, state) -> { > myAcc.add(1); > <...> > state.update(newState); > return new LeadingCharCount(key, newState); > }, > Encoders.LONG(), > Encoders.bean(LeadingCharCount.class), > GroupStateTimeout.ProcessingTimeTimeout()) > ``` > > Also works fine with my `StateUpdateTask`: > ``` > .mapGroupsWithState( > new StateUpdateTask(myAcc), > Encoders.LONG(), > Encoders.bean(LeadingCharCount.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > public class StateUpdateTask > implements MapGroupsWithStateFunction<String, String, Long, > LeadingCharCount> { > private LongAccumulator myAccInTask; > > public StateUpdateTask(LongAccumulator acc) { > this.myAccInTask = acc; > } > > @Override > public LeadingCharCount call(String key, Iterator<String> values, > GroupState<Long> state) throws Exception { > myAccInTask.add(1); > <...> > state.update(newState); > return new LeadingCharCount(key, newState); > } > } > ``` > > -- > Cheers, > -z > > On Tue, 2 Jun 2020 10:28:36 +0800 > ZHANG Wei <wezh...@outlook.com> wrote: > > > Yes, verified on the cluster with 5 executors. > > > > -- > > Cheers, > > -z > > > > On Fri, 29 May 2020 11:16:12 -0700 > > Something Something <mailinglist...@gmail.com> wrote: > > > > > Did you try this on the Cluster? Note: This works just fine under > 'Local' > > > mode. > > > > > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com> wrote: > > > > > > > I can't reproduce the issue with my simple code: > > > > ```scala > > > > spark.streams.addListener(new StreamingQueryListener { > > > > override def onQueryProgress(event: > > > > StreamingQueryListener.QueryProgressEvent): Unit = { > > > > println(event.progress.id + " is on progress") > > > > println(s"My accu is ${myAcc.value} on query progress") > > > > } > > > > ... > > > > }) > > > > > > > > def mappingFunc(key: Long, values: Iterator[String], state: > > > > GroupState[Long]): ... = { > > > > myAcc.add(1) > > > > println(s">>> key: $key => state: ${state}") > > > > ... > > > > } > > > > > > > > val wordCounts = words > > > > .groupByKey(v => ...) > > > > .mapGroupsWithState(timeoutConf = > > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) > > > > > > > > val query = wordCounts.writeStream > > > > .outputMode(OutputMode.Update) > > > > ... > > > > ``` > > > > > > > > I'm wondering if there were any errors can be found from driver > logs? The > > > > micro-batch > > > > exceptions won't terminate the streaming job running. > > > > > > > > For the following code, we have to make sure that `StateUpdateTask` > is > > > > started: > > > > > .mapGroupsWithState( > > > > > new > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > > appConfig, accumulators), > > > > > Encoders.bean(ModelStateInfo.class), > > > > > Encoders.bean(ModelUpdate.class), > > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > > > -- > > > > Cheers, > > > > -z > > > > > > > > On Thu, 28 May 2020 19:59:31 +0530 > > > > Srinivas V <srini....@gmail.com> wrote: > > > > > > > > > Giving the code below: > > > > > //accumulators is a class level variable in driver. > > > > > > > > > > sparkSession.streams().addListener(new StreamingQueryListener() { > > > > > @Override > > > > > public void onQueryStarted(QueryStartedEvent > queryStarted) { > > > > > logger.info("Query started: " + > queryStarted.id()); > > > > > } > > > > > @Override > > > > > public void onQueryTerminated(QueryTerminatedEvent > > > > > queryTerminated) { > > > > > logger.info("Query terminated: " + > > > > queryTerminated.id()); > > > > > } > > > > > @Override > > > > > public void onQueryProgress(QueryProgressEvent > > > > queryProgress) { > > > > > > > > > > > accumulators.eventsReceived(queryProgress.progress().numInputRows()); > > > > > long eventsReceived = 0; > > > > > long eventsExpired = 0; > > > > > long eventSentSuccess = 0; > > > > > try { > > > > > eventsReceived = > > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED); > > > > > eventsExpired = > > > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED); > > > > > eventSentSuccess = > > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT); > > > > > } catch (MissingKeyException e) { > > > > > logger.error("Accumulator key not found due to > > > > > Exception {}", e.getMessage()); > > > > > } > > > > > logger.info("Events Received:{}", eventsReceived); > > > > > logger.info("Events State Expired:{}", > eventsExpired); > > > > > logger.info("Events Sent Success:{}", > eventSentSuccess); > > > > > logger.info("Query made progress - batchId: {} > > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{} > > > > > durationMs:{}" , > > > > > queryProgress.progress().batchId(), > > > > > queryProgress.progress().numInputRows(), > > > > > queryProgress.progress().inputRowsPerSecond(), > > > > > > > > > queryProgress.progress().processedRowsPerSecond(), > > > > > queryProgress.progress().durationMs()); > > > > > > > > > > > > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com> > wrote: > > > > > > > > > > > May I get how the accumulator is accessed in the method > > > > > > `onQueryProgress()`? > > > > > > > > > > > > AFAICT, the accumulator is incremented well. There is a way to > verify > > > > that > > > > > > in cluster like this: > > > > > > ``` > > > > > > // Add the following while loop before invoking > awaitTermination > > > > > > while (true) { > > > > > > println("My acc: " + myAcc.value) > > > > > > Thread.sleep(5 * 1000) > > > > > > } > > > > > > > > > > > > //query.awaitTermination() > > > > > > ``` > > > > > > > > > > > > And the accumulator value updated can be found from driver > stdout. > > > > > > > > > > > > -- > > > > > > Cheers, > > > > > > -z > > > > > > > > > > > > On Thu, 28 May 2020 17:12:48 +0530 > > > > > > Srinivas V <srini....@gmail.com> wrote: > > > > > > > > > > > > > yes, I am using stateful structured streaming. Yes similar to > what > > > > you > > > > > > do. > > > > > > > This is in Java > > > > > > > I do it this way: > > > > > > > Dataset<ModelUpdate> productUpdates = watermarkedDS > > > > > > > .groupByKey( > > > > > > > (MapFunction<InputEventModel, String>) > event > > > > -> > > > > > > > event.getId(), Encoders.STRING()) > > > > > > > .mapGroupsWithState( > > > > > > > new > > > > > > > > > > > > > > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > > > > appConfig, accumulators), > > > > > > > Encoders.bean(ModelStateInfo.class), > > > > > > > Encoders.bean(ModelUpdate.class), > > > > > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > > > > > > > > > StateUpdateTask contains the update method. > > > > > > > > > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < > > > > > > > mailinglist...@gmail.com> wrote: > > > > > > > > > > > > > > > Yes, that's exactly how I am creating them. > > > > > > > > > > > > > > > > Question... Are you using 'Stateful Structured Streaming' in > which > > > > > > you've > > > > > > > > something like this? > > > > > > > > > > > > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > > > > > updateAcrossEvents > > > > > > > > ) > > > > > > > > > > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? > We're > > > > > > experiencing this only under 'Stateful Structured Streaming'. In > other > > > > > > streaming applications it works as expected. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < > srini....@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > >> Yes, I am talking about Application specific Accumulators. > > > > Actually I > > > > > > am > > > > > > > >> getting the values printed in my driver log as well as sent > to > > > > > > Grafana. Not > > > > > > > >> sure where and when I saw 0 before. My deploy mode is > “client” on > > > > a > > > > > > yarn > > > > > > > >> cluster(not local Mac) where I submit from master node. It > should > > > > > > work the > > > > > > > >> same for cluster mode as well. > > > > > > > >> Create accumulators like this: > > > > > > > >> AccumulatorV2 accumulator = > sparkContext.longAccumulator(name); > > > > > > > >> > > > > > > > >> > > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > > > > > > >> mailinglist...@gmail.com> wrote: > > > > > > > >> > > > > > > > >>> Hmm... how would they go to Graphana if they are not > getting > > > > > > computed in > > > > > > > >>> your code? I am talking about the Application Specific > > > > Accumulators. > > > > > > The > > > > > > > >>> other standard counters such as > > > > 'event.progress.inputRowsPerSecond' > > > > > > are > > > > > > > >>> getting populated correctly! > > > > > > > >>> > > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < > srini....@gmail.com> > > > > > > wrote: > > > > > > > >>> > > > > > > > >>>> Hello, > > > > > > > >>>> Even for me it comes as 0 when I print in > OnQueryProgress. I use > > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but > not on > > > > > > cluster. > > > > > > > >>>> But one consolation is that when I send metrics to > Graphana, the > > > > > > values > > > > > > > >>>> are coming there. > > > > > > > >>>> > > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > > > > > >>>> mailinglist...@gmail.com> wrote: > > > > > > > >>>> > > > > > > > >>>>> No this is not working even if I use LongAccumulator. > > > > > > > >>>>> > > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < > wezh...@outlook.com > > > > > > > > > > > wrote: > > > > > > > >>>>> > > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the > OUT type > > > > > > should > > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the > implementation > > > > for > > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there > any > > > > chance > > > > > > to replace > > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or > > > > > > LongAccumulator[3] > > > > > > > >>>>>> and test if the StreamingListener and other codes are > able to > > > > > > work? > > > > > > > >>>>>> > > > > > > > >>>>>> --- > > > > > > > >>>>>> Cheers, > > > > > > > >>>>>> -z > > > > > > > >>>>>> [1] > > > > > > > >>>>>> > > > > > > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&reserved=0 > > > > > > > >>>>>> [2] > > > > > > > >>>>>> > > > > > > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&reserved=0 > > > > > > > >>>>>> [3] > > > > > > > >>>>>> > > > > > > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&reserved=0 > > > > > > > >>>>>> > > > > > > > >>>>>> ________________________________________ > > > > > > > >>>>>> From: Something Something <mailinglist...@gmail.com> > > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 > > > > > > > >>>>>> To: spark-user > > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured > > > > Streaming > > > > > > > >>>>>> > > > > > > > >>>>>> Can someone from Spark Development team tell me if this > > > > > > functionality > > > > > > > >>>>>> is supported and tested? I've spent a lot of time on > this but > > > > > > can't get it > > > > > > > >>>>>> to work. Just to add more context, we've our own > Accumulator > > > > > > class that > > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track > of > > > > one or > > > > > > more > > > > > > > >>>>>> accumulators. Here's the definition: > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> class CollectionLongAccumulator[T] > > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] > > > > > > > >>>>>> > > > > > > > >>>>>> When the job begins we register an instance of this > class: > > > > > > > >>>>>> > > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, > "MyAccumulator") > > > > > > > >>>>>> > > > > > > > >>>>>> Is this working under Structured Streaming? > > > > > > > >>>>>> > > > > > > > >>>>>> I will keep looking for alternate approaches but any > help > > > > would be > > > > > > > >>>>>> greatly appreciated. Thanks. > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < > > > > > > > >>>>>> mailinglist...@gmail.com<mailto: > mailinglist...@gmail.com>> > > > > wrote: > > > > > > > >>>>>> > > > > > > > >>>>>> In my structured streaming job I am updating Spark > > > > Accumulators in > > > > > > > >>>>>> the updateAcrossEvents method but they are always 0 > when I > > > > try to > > > > > > print > > > > > > > >>>>>> them in my StreamingListener. Here's the code: > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > > > > >>>>>> updateAcrossEvents > > > > > > > >>>>>> ) > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> The accumulators get incremented in > 'updateAcrossEvents'. > > > > I've a > > > > > > > >>>>>> StreamingListener which writes values of the > accumulators in > > > > > > > >>>>>> 'onQueryProgress' method but in this method the > Accumulators > > > > are > > > > > > ALWAYS > > > > > > > >>>>>> ZERO! > > > > > > > >>>>>> > > > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I > > > > could see > > > > > > > >>>>>> that these accumulators are getting incremented as > expected. > > > > > > > >>>>>> > > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In > Local > > > > mode > > > > > > it > > > > > > > >>>>>> works fine which implies that the Accumulators are not > getting > > > > > > distributed > > > > > > > >>>>>> correctly - or something like that! > > > > > > > >>>>>> > > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that > tell me to > > > > > > > >>>>>> perform an "Action". That's not a solution here. This > is a > > > > > > 'Stateful > > > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' > them > > > > in > > > > > > > >>>>>> SparkContext. > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >