*Honestly, I don't know how to do this in Scala.* I tried something like this...
*.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( new StateUpdater(myAcc))* StateUpdater is similar to what Zhang has provided but it's NOT compiling 'cause I need to return a 'Dataset'. Here's the definition of mapGroupsWithState in Scala: def mapGroupsWithState[S: Encoder, U: Encoder]( timeoutConf: GroupStateTimeout)( func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = { On Mon, Jun 8, 2020 at 12:07 PM Srinivas V <srini....@gmail.com> wrote: > Ya, I had asked this question before. No one responded. By the way, what’s > your actual name “Something something” if you don’t mind me asking? > > On Tue, Jun 9, 2020 at 12:27 AM Something Something < > mailinglist...@gmail.com> wrote: > >> What is scary is this interface is marked as "experimental" >> >> @Experimental >> @InterfaceStability.Evolving >> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable >> { >> R call(K key, Iterator<V> values, GroupState<S> state) throws Exception; >> } >> >> >> >> >> On Mon, Jun 8, 2020 at 11:54 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Right, this is exactly how I've it right now. Problem is in the cluster >>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you >>> will see what I mean. >>> >>> I think how Zhang is using will work. Will try & revert. >>> >>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini....@gmail.com> wrote: >>> >>>> >>>> You don’t need to have a separate class. I created that as it has lot >>>> of code and logic in my case. >>>> For you to quickly test you can use Zhang’s Scala code in this chain. >>>> Pasting it below for your quick reference: >>>> >>>> ```scala >>>> spark.streams.addListener(new StreamingQueryListener { >>>> override def onQueryProgress(event: >>>> StreamingQueryListener.QueryProgressEvent): >>>> Unit = { >>>> println(event.progress.id + " is on progress") >>>> println(s"My accu is ${myAcc.value} on query progress") >>>> } >>>> ... >>>> }) >>>> >>>> def mappingFunc(key: Long, values: Iterator[String], state: >>>> GroupState[Long]): ... = { >>>> myAcc.add(1) >>>> println(s">>> key: $key => state: ${state}") >>>> ... >>>> } >>>> >>>> val wordCounts = words >>>> .groupByKey(v => ...) >>>> .mapGroupsWithState(timeoutConf = >>>> GroupStateTimeout.ProcessingTimeTimeout)(func >>>> = mappingFunc) >>>> >>>> val query = wordCounts.writeStream >>>> .outputMode(OutputMode.Update) >>>> >>>> >>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> Great. I guess the trick is to use a separate class such as >>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into >>>>> Scala. Will try it out & revert. Thanks for the tips. >>>>> >>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote: >>>>> >>>>>> The following Java codes can work in my cluster environment: >>>>>> ``` >>>>>> .mapGroupsWithState((MapGroupsWithStateFunction<String, String, >>>>>> Long, LeadingCharCount>) (key, values, state) -> { >>>>>> myAcc.add(1); >>>>>> <...> >>>>>> state.update(newState); >>>>>> return new LeadingCharCount(key, newState); >>>>>> }, >>>>>> Encoders.LONG(), >>>>>> Encoders.bean(LeadingCharCount.class), >>>>>> GroupStateTimeout.ProcessingTimeTimeout()) >>>>>> ``` >>>>>> >>>>>> Also works fine with my `StateUpdateTask`: >>>>>> ``` >>>>>> .mapGroupsWithState( >>>>>> new StateUpdateTask(myAcc), >>>>>> Encoders.LONG(), >>>>>> Encoders.bean(LeadingCharCount.class), >>>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>>> >>>>>> public class StateUpdateTask >>>>>> implements MapGroupsWithStateFunction<String, String, >>>>>> Long, LeadingCharCount> { >>>>>> private LongAccumulator myAccInTask; >>>>>> >>>>>> public StateUpdateTask(LongAccumulator acc) { >>>>>> this.myAccInTask = acc; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public LeadingCharCount call(String key, Iterator<String> >>>>>> values, GroupState<Long> state) throws Exception { >>>>>> myAccInTask.add(1); >>>>>> <...> >>>>>> state.update(newState); >>>>>> return new LeadingCharCount(key, newState); >>>>>> } >>>>>> } >>>>>> ``` >>>>>> >>>>>> -- >>>>>> Cheers, >>>>>> -z >>>>>> >>>>>> On Tue, 2 Jun 2020 10:28:36 +0800 >>>>>> ZHANG Wei <wezh...@outlook.com> wrote: >>>>>> >>>>>> > Yes, verified on the cluster with 5 executors. >>>>>> > >>>>>> > -- >>>>>> > Cheers, >>>>>> > -z >>>>>> > >>>>>> > On Fri, 29 May 2020 11:16:12 -0700 >>>>>> > Something Something <mailinglist...@gmail.com> wrote: >>>>>> > >>>>>> > > Did you try this on the Cluster? Note: This works just fine under >>>>>> 'Local' >>>>>> > > mode. >>>>>> > > >>>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com> >>>>>> wrote: >>>>>> > > >>>>>> > > > I can't reproduce the issue with my simple code: >>>>>> > > > ```scala >>>>>> > > > spark.streams.addListener(new StreamingQueryListener { >>>>>> > > > override def onQueryProgress(event: >>>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = { >>>>>> > > > println(event.progress.id + " is on progress") >>>>>> > > > println(s"My accu is ${myAcc.value} on query progress") >>>>>> > > > } >>>>>> > > > ... >>>>>> > > > }) >>>>>> > > > >>>>>> > > > def mappingFunc(key: Long, values: Iterator[String], state: >>>>>> > > > GroupState[Long]): ... = { >>>>>> > > > myAcc.add(1) >>>>>> > > > println(s">>> key: $key => state: ${state}") >>>>>> > > > ... >>>>>> > > > } >>>>>> > > > >>>>>> > > > val wordCounts = words >>>>>> > > > .groupByKey(v => ...) >>>>>> > > > .mapGroupsWithState(timeoutConf = >>>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) >>>>>> > > > >>>>>> > > > val query = wordCounts.writeStream >>>>>> > > > .outputMode(OutputMode.Update) >>>>>> > > > ... >>>>>> > > > ``` >>>>>> > > > >>>>>> > > > I'm wondering if there were any errors can be found from driver >>>>>> logs? The >>>>>> > > > micro-batch >>>>>> > > > exceptions won't terminate the streaming job running. >>>>>> > > > >>>>>> > > > For the following code, we have to make sure that >>>>>> `StateUpdateTask` is >>>>>> > > > started: >>>>>> > > > > .mapGroupsWithState( >>>>>> > > > > new >>>>>> > > > >>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>>> > > > > appConfig, accumulators), >>>>>> > > > > Encoders.bean(ModelStateInfo.class), >>>>>> > > > > Encoders.bean(ModelUpdate.class), >>>>>> > > > > >>>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>>> > > > >>>>>> > > > -- >>>>>> > > > Cheers, >>>>>> > > > -z >>>>>> > > > >>>>>> > > > On Thu, 28 May 2020 19:59:31 +0530 >>>>>> > > > Srinivas V <srini....@gmail.com> wrote: >>>>>> > > > >>>>>> > > > > Giving the code below: >>>>>> > > > > //accumulators is a class level variable in driver. >>>>>> > > > > >>>>>> > > > > sparkSession.streams().addListener(new >>>>>> StreamingQueryListener() { >>>>>> > > > > @Override >>>>>> > > > > public void onQueryStarted(QueryStartedEvent >>>>>> queryStarted) { >>>>>> > > > > logger.info("Query started: " + >>>>>> queryStarted.id()); >>>>>> > > > > } >>>>>> > > > > @Override >>>>>> > > > > public void onQueryTerminated(QueryTerminatedEvent >>>>>> > > > > queryTerminated) { >>>>>> > > > > logger.info("Query terminated: " + >>>>>> > > > queryTerminated.id()); >>>>>> > > > > } >>>>>> > > > > @Override >>>>>> > > > > public void onQueryProgress(QueryProgressEvent >>>>>> > > > queryProgress) { >>>>>> > > > > >>>>>> > > > > >>>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows()); >>>>>> > > > > long eventsReceived = 0; >>>>>> > > > > long eventsExpired = 0; >>>>>> > > > > long eventSentSuccess = 0; >>>>>> > > > > try { >>>>>> > > > > eventsReceived = >>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED); >>>>>> > > > > eventsExpired = >>>>>> > > > > >>>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED); >>>>>> > > > > eventSentSuccess = >>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT); >>>>>> > > > > } catch (MissingKeyException e) { >>>>>> > > > > logger.error("Accumulator key not found >>>>>> due to >>>>>> > > > > Exception {}", e.getMessage()); >>>>>> > > > > } >>>>>> > > > > logger.info("Events Received:{}", >>>>>> eventsReceived); >>>>>> > > > > logger.info("Events State Expired:{}", >>>>>> eventsExpired); >>>>>> > > > > logger.info("Events Sent Success:{}", >>>>>> eventSentSuccess); >>>>>> > > > > logger.info("Query made progress - batchId: >>>>>> {} >>>>>> > > > > numInputRows:{} inputRowsPerSecond:{} >>>>>> processedRowsPerSecond:{} >>>>>> > > > > durationMs:{}" , >>>>>> > > > > queryProgress.progress().batchId(), >>>>>> > > > > queryProgress.progress().numInputRows(), >>>>>> > > > > queryProgress.progress().inputRowsPerSecond(), >>>>>> > > > > >>>>>> > > > queryProgress.progress().processedRowsPerSecond(), >>>>>> > > > > queryProgress.progress().durationMs()); >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei < >>>>>> wezh...@outlook.com> wrote: >>>>>> > > > > >>>>>> > > > > > May I get how the accumulator is accessed in the method >>>>>> > > > > > `onQueryProgress()`? >>>>>> > > > > > >>>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way >>>>>> to verify >>>>>> > > > that >>>>>> > > > > > in cluster like this: >>>>>> > > > > > ``` >>>>>> > > > > > // Add the following while loop before invoking >>>>>> awaitTermination >>>>>> > > > > > while (true) { >>>>>> > > > > > println("My acc: " + myAcc.value) >>>>>> > > > > > Thread.sleep(5 * 1000) >>>>>> > > > > > } >>>>>> > > > > > >>>>>> > > > > > //query.awaitTermination() >>>>>> > > > > > ``` >>>>>> > > > > > >>>>>> > > > > > And the accumulator value updated can be found from driver >>>>>> stdout. >>>>>> > > > > > >>>>>> > > > > > -- >>>>>> > > > > > Cheers, >>>>>> > > > > > -z >>>>>> > > > > > >>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >>>>>> > > > > > Srinivas V <srini....@gmail.com> wrote: >>>>>> > > > > > >>>>>> > > > > > > yes, I am using stateful structured streaming. Yes >>>>>> similar to what >>>>>> > > > you >>>>>> > > > > > do. >>>>>> > > > > > > This is in Java >>>>>> > > > > > > I do it this way: >>>>>> > > > > > > Dataset<ModelUpdate> productUpdates = watermarkedDS >>>>>> > > > > > > .groupByKey( >>>>>> > > > > > > (MapFunction<InputEventModel, >>>>>> String>) event >>>>>> > > > -> >>>>>> > > > > > > event.getId(), Encoders.STRING()) >>>>>> > > > > > > .mapGroupsWithState( >>>>>> > > > > > > new >>>>>> > > > > > > >>>>>> > > > > > >>>>>> > > > >>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>>> > > > > > > appConfig, accumulators), >>>>>> > > > > > > >>>>>> Encoders.bean(ModelStateInfo.class), >>>>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>>>> > > > > > > >>>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>>> > > > > > > >>>>>> > > > > > > StateUpdateTask contains the update method. >>>>>> > > > > > > >>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>>> > > > > > > mailinglist...@gmail.com> wrote: >>>>>> > > > > > > >>>>>> > > > > > > > Yes, that's exactly how I am creating them. >>>>>> > > > > > > > >>>>>> > > > > > > > Question... Are you using 'Stateful Structured >>>>>> Streaming' in which >>>>>> > > > > > you've >>>>>> > > > > > > > something like this? >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>> > > > > > > > updateAcrossEvents >>>>>> > > > > > > > ) >>>>>> > > > > > > > >>>>>> > > > > > > > And updating the Accumulator inside >>>>>> 'updateAcrossEvents'? We're >>>>>> > > > > > experiencing this only under 'Stateful Structured >>>>>> Streaming'. In other >>>>>> > > > > > streaming applications it works as expected. >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>>>>> srini....@gmail.com> >>>>>> > > > > > wrote: >>>>>> > > > > > > > >>>>>> > > > > > > >> Yes, I am talking about Application specific >>>>>> Accumulators. >>>>>> > > > Actually I >>>>>> > > > > > am >>>>>> > > > > > > >> getting the values printed in my driver log as well as >>>>>> sent to >>>>>> > > > > > Grafana. Not >>>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>>>>> “client” on >>>>>> > > > a >>>>>> > > > > > yarn >>>>>> > > > > > > >> cluster(not local Mac) where I submit from master >>>>>> node. It should >>>>>> > > > > > work the >>>>>> > > > > > > >> same for cluster mode as well. >>>>>> > > > > > > >> Create accumulators like this: >>>>>> > > > > > > >> AccumulatorV2 accumulator = >>>>>> sparkContext.longAccumulator(name); >>>>>> > > > > > > >> >>>>>> > > > > > > >> >>>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>>> > > > > > > >> mailinglist...@gmail.com> wrote: >>>>>> > > > > > > >> >>>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not >>>>>> getting >>>>>> > > > > > computed in >>>>>> > > > > > > >>> your code? I am talking about the Application Specific >>>>>> > > > Accumulators. >>>>>> > > > > > The >>>>>> > > > > > > >>> other standard counters such as >>>>>> > > > 'event.progress.inputRowsPerSecond' >>>>>> > > > > > are >>>>>> > > > > > > >>> getting populated correctly! >>>>>> > > > > > > >>> >>>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < >>>>>> srini....@gmail.com> >>>>>> > > > > > wrote: >>>>>> > > > > > > >>> >>>>>> > > > > > > >>>> Hello, >>>>>> > > > > > > >>>> Even for me it comes as 0 when I print in >>>>>> OnQueryProgress. I use >>>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local >>>>>> but not on >>>>>> > > > > > cluster. >>>>>> > > > > > > >>>> But one consolation is that when I send metrics to >>>>>> Graphana, the >>>>>> > > > > > values >>>>>> > > > > > > >>>> are coming there. >>>>>> > > > > > > >>>> >>>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>> > > > > > > >>>> mailinglist...@gmail.com> wrote: >>>>>> > > > > > > >>>> >>>>>> > > > > > > >>>>> No this is not working even if I use >>>>>> LongAccumulator. >>>>>> > > > > > > >>>>> >>>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < >>>>>> wezh...@outlook.com >>>>>> > > > > >>>>>> > > > > > wrote: >>>>>> > > > > > > >>>>> >>>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], >>>>>> the OUT type >>>>>> > > > > > should >>>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the >>>>>> implementation >>>>>> > > > for >>>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is >>>>>> there any >>>>>> > > > chance >>>>>> > > > > > to replace >>>>>> > > > > > > >>>>>> CollectionLongAccumulator by >>>>>> CollectionAccumulator[2] or >>>>>> > > > > > LongAccumulator[3] >>>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes >>>>>> are able to >>>>>> > > > > > work? >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> --- >>>>>> > > > > > > >>>>>> Cheers, >>>>>> > > > > > > >>>>>> -z >>>>>> > > > > > > >>>>>> [1] >>>>>> > > > > > > >>>>>> >>>>>> > > > > > >>>>>> > > > >>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&reserved=0 >>>>>> > > > > > > >>>>>> [2] >>>>>> > > > > > > >>>>>> >>>>>> > > > > > >>>>>> > > > >>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&reserved=0 >>>>>> > > > > > > >>>>>> [3] >>>>>> > > > > > > >>>>>> >>>>>> > > > > > >>>>>> > > > >>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&reserved=0 >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> ________________________________________ >>>>>> > > > > > > >>>>>> From: Something Something < >>>>>> mailinglist...@gmail.com> >>>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>> > > > > > > >>>>>> To: spark-user >>>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with >>>>>> Structured >>>>>> > > > Streaming >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if >>>>>> this >>>>>> > > > > > functionality >>>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time >>>>>> on this but >>>>>> > > > > > can't get it >>>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own >>>>>> Accumulator >>>>>> > > > > > class that >>>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep >>>>>> track of >>>>>> > > > one or >>>>>> > > > > > more >>>>>> > > > > > > >>>>>> accumulators. Here's the definition: >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T] >>>>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, >>>>>> Long]] >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> When the job begins we register an instance of >>>>>> this class: >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, >>>>>> "MyAccumulator") >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> Is this working under Structured Streaming? >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but >>>>>> any help >>>>>> > > > would be >>>>>> > > > > > > >>>>>> greatly appreciated. Thanks. >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something >>>>>> Something < >>>>>> > > > > > > >>>>>> mailinglist...@gmail.com<mailto: >>>>>> mailinglist...@gmail.com>> >>>>>> > > > wrote: >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark >>>>>> > > > Accumulators in >>>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always >>>>>> 0 when I >>>>>> > > > try to >>>>>> > > > > > print >>>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code: >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>> > > > > > > >>>>>> updateAcrossEvents >>>>>> > > > > > > >>>>>> ) >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> The accumulators get incremented in >>>>>> 'updateAcrossEvents'. >>>>>> > > > I've a >>>>>> > > > > > > >>>>>> StreamingListener which writes values of the >>>>>> accumulators in >>>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the >>>>>> Accumulators >>>>>> > > > are >>>>>> > > > > > ALWAYS >>>>>> > > > > > > >>>>>> ZERO! >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> When I added log statements in the >>>>>> updateAcrossEvents, I >>>>>> > > > could see >>>>>> > > > > > > >>>>>> that these accumulators are getting incremented as >>>>>> expected. >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' >>>>>> mode. In Local >>>>>> > > > mode >>>>>> > > > > > it >>>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are >>>>>> not getting >>>>>> > > > > > distributed >>>>>> > > > > > > >>>>>> correctly - or something like that! >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web >>>>>> that tell me to >>>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here. >>>>>> This is a >>>>>> > > > > > 'Stateful >>>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also >>>>>> 'registering' them >>>>>> > > > in >>>>>> > > > > > > >>>>>> SparkContext. >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > > >>>>>> >>>>>> > > > > > >>>>>> > > > >>>>>> > >>>>>> > >>>>>> --------------------------------------------------------------------- >>>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> > >>>>>> >>>>>