*Honestly, I don't know how to do this in Scala.* I tried something like
this...



*.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(  new
StateUpdater(myAcc))*

StateUpdater is similar to what Zhang has provided but it's NOT
compiling 'cause I need to return a 'Dataset'.


Here's the definition of mapGroupsWithState in Scala:

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout)(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {



On Mon, Jun 8, 2020 at 12:07 PM Srinivas V <srini....@gmail.com> wrote:

> Ya, I had asked this question before. No one responded. By the way, what’s
> your actual name “Something something” if you don’t mind me asking?
>
> On Tue, Jun 9, 2020 at 12:27 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> What is scary is this interface is marked as "experimental"
>>
>> @Experimental
>> @InterfaceStability.Evolving
>> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable 
>> {
>>   R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
>> }
>>
>>
>>
>>
>> On Mon, Jun 8, 2020 at 11:54 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Right, this is exactly how I've it right now. Problem is in the cluster
>>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
>>> will see what I mean.
>>>
>>> I think how Zhang is using will work. Will try & revert.
>>>
>>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini....@gmail.com> wrote:
>>>
>>>>
>>>> You don’t need to have a separate class. I created that as it has lot
>>>> of code and logic in my case.
>>>> For you to quickly test you can use Zhang’s Scala code in this chain.
>>>> Pasting it below for your quick reference:
>>>>
>>>> ```scala
>>>>     spark.streams.addListener(new StreamingQueryListener {
>>>>       override def onQueryProgress(event: 
>>>> StreamingQueryListener.QueryProgressEvent):
>>>> Unit = {
>>>>         println(event.progress.id + " is on progress")
>>>>         println(s"My accu is ${myAcc.value} on query progress")
>>>>       }
>>>>         ...
>>>>     })
>>>>
>>>>     def mappingFunc(key: Long, values: Iterator[String], state:
>>>> GroupState[Long]): ... = {
>>>>       myAcc.add(1)
>>>>       println(s">>> key: $key => state: ${state}")
>>>>         ...
>>>>     }
>>>>
>>>>     val wordCounts = words
>>>>       .groupByKey(v => ...)
>>>>       .mapGroupsWithState(timeoutConf = 
>>>> GroupStateTimeout.ProcessingTimeTimeout)(func
>>>> = mappingFunc)
>>>>
>>>>     val query = wordCounts.writeStream
>>>>       .outputMode(OutputMode.Update)
>>>>
>>>>
>>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Great. I guess the trick is to use a separate class such as
>>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>>>> Scala. Will try it out & revert. Thanks for the tips.
>>>>>
>>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote:
>>>>>
>>>>>> The following Java codes can work in my cluster environment:
>>>>>> ```
>>>>>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>>>                 myAcc.add(1);
>>>>>>                 <...>
>>>>>>                 state.update(newState);
>>>>>>                 return new LeadingCharCount(key, newState);
>>>>>>             },
>>>>>>             Encoders.LONG(),
>>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>>             GroupStateTimeout.ProcessingTimeTimeout())
>>>>>> ```
>>>>>>
>>>>>> Also works fine with my `StateUpdateTask`:
>>>>>> ```
>>>>>>     .mapGroupsWithState(
>>>>>>             new StateUpdateTask(myAcc),
>>>>>>             Encoders.LONG(),
>>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>>             GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>
>>>>>> public class StateUpdateTask
>>>>>>             implements MapGroupsWithStateFunction<String, String,
>>>>>> Long, LeadingCharCount> {
>>>>>>         private LongAccumulator myAccInTask;
>>>>>>
>>>>>>         public StateUpdateTask(LongAccumulator acc) {
>>>>>>             this.myAccInTask = acc;
>>>>>>         }
>>>>>>
>>>>>>         @Override
>>>>>>         public LeadingCharCount call(String key, Iterator<String>
>>>>>> values, GroupState<Long> state) throws Exception {
>>>>>>             myAccInTask.add(1);
>>>>>>             <...>
>>>>>>             state.update(newState);
>>>>>>             return new LeadingCharCount(key, newState);
>>>>>>         }
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> -z
>>>>>>
>>>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>>>> ZHANG Wei <wezh...@outlook.com> wrote:
>>>>>>
>>>>>> > Yes, verified on the cluster with 5 executors.
>>>>>> >
>>>>>> > --
>>>>>> > Cheers,
>>>>>> > -z
>>>>>> >
>>>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>>>> > Something Something <mailinglist...@gmail.com> wrote:
>>>>>> >
>>>>>> > > Did you try this on the Cluster? Note: This works just fine under
>>>>>> 'Local'
>>>>>> > > mode.
>>>>>> > >
>>>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com>
>>>>>> wrote:
>>>>>> > >
>>>>>> > > > I can't reproduce the issue with my simple code:
>>>>>> > > > ```scala
>>>>>> > > >     spark.streams.addListener(new StreamingQueryListener {
>>>>>> > > >       override def onQueryProgress(event:
>>>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>>>>> > > >         println(event.progress.id + " is on progress")
>>>>>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>>>>>> > > >       }
>>>>>> > > >         ...
>>>>>> > > >     })
>>>>>> > > >
>>>>>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>>>>>> > > > GroupState[Long]): ... = {
>>>>>> > > >       myAcc.add(1)
>>>>>> > > >       println(s">>> key: $key => state: ${state}")
>>>>>> > > >         ...
>>>>>> > > >     }
>>>>>> > > >
>>>>>> > > >     val wordCounts = words
>>>>>> > > >       .groupByKey(v => ...)
>>>>>> > > >       .mapGroupsWithState(timeoutConf =
>>>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>>>>> > > >
>>>>>> > > >     val query = wordCounts.writeStream
>>>>>> > > >       .outputMode(OutputMode.Update)
>>>>>> > > >         ...
>>>>>> > > > ```
>>>>>> > > >
>>>>>> > > > I'm wondering if there were any errors can be found from driver
>>>>>> logs? The
>>>>>> > > > micro-batch
>>>>>> > > > exceptions won't terminate the streaming job running.
>>>>>> > > >
>>>>>> > > > For the following code, we have to make sure that
>>>>>> `StateUpdateTask` is
>>>>>> > > > started:
>>>>>> > > > >                 .mapGroupsWithState(
>>>>>> > > > >                         new
>>>>>> > > >
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> > > > > appConfig, accumulators),
>>>>>> > > > >                         Encoders.bean(ModelStateInfo.class),
>>>>>> > > > >                         Encoders.bean(ModelUpdate.class),
>>>>>> > > > >
>>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>>> > > >
>>>>>> > > > --
>>>>>> > > > Cheers,
>>>>>> > > > -z
>>>>>> > > >
>>>>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>>>>> > > > Srinivas V <srini....@gmail.com> wrote:
>>>>>> > > >
>>>>>> > > > > Giving the code below:
>>>>>> > > > > //accumulators is a class level variable in driver.
>>>>>> > > > >
>>>>>> > > > >  sparkSession.streams().addListener(new
>>>>>> StreamingQueryListener() {
>>>>>> > > > >             @Override
>>>>>> > > > >             public void onQueryStarted(QueryStartedEvent
>>>>>> queryStarted) {
>>>>>> > > > >                 logger.info("Query started: " +
>>>>>> queryStarted.id());
>>>>>> > > > >             }
>>>>>> > > > >             @Override
>>>>>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>>>>>> > > > > queryTerminated) {
>>>>>> > > > >                 logger.info("Query terminated: " +
>>>>>> > > > queryTerminated.id());
>>>>>> > > > >             }
>>>>>> > > > >             @Override
>>>>>> > > > >             public void onQueryProgress(QueryProgressEvent
>>>>>> > > > queryProgress) {
>>>>>> > > > >
>>>>>> > > > >
>>>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>>>>> > > > >                 long eventsReceived = 0;
>>>>>> > > > >                 long eventsExpired = 0;
>>>>>> > > > >                 long eventSentSuccess = 0;
>>>>>> > > > >                 try {
>>>>>> > > > >                     eventsReceived =
>>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>>>>> > > > >                     eventsExpired =
>>>>>> > > > >
>>>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>>>>> > > > >                     eventSentSuccess =
>>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>>>>> > > > >                 } catch (MissingKeyException e) {
>>>>>> > > > >                     logger.error("Accumulator key not found
>>>>>> due to
>>>>>> > > > > Exception {}", e.getMessage());
>>>>>> > > > >                 }
>>>>>> > > > >                 logger.info("Events Received:{}",
>>>>>> eventsReceived);
>>>>>> > > > >                 logger.info("Events State Expired:{}",
>>>>>> eventsExpired);
>>>>>> > > > >                 logger.info("Events Sent Success:{}",
>>>>>> eventSentSuccess);
>>>>>> > > > >                 logger.info("Query made progress - batchId:
>>>>>> {}
>>>>>> > > > > numInputRows:{} inputRowsPerSecond:{}
>>>>>> processedRowsPerSecond:{}
>>>>>> > > > > durationMs:{}" ,
>>>>>> > > > >                         queryProgress.progress().batchId(),
>>>>>> > > > > queryProgress.progress().numInputRows(),
>>>>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>>>>> > > > >
>>>>>> > > >  queryProgress.progress().processedRowsPerSecond(),
>>>>>> > > > > queryProgress.progress().durationMs());
>>>>>> > > > >
>>>>>> > > > >
>>>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <
>>>>>> wezh...@outlook.com> wrote:
>>>>>> > > > >
>>>>>> > > > > > May I get how the accumulator is accessed in the method
>>>>>> > > > > > `onQueryProgress()`?
>>>>>> > > > > >
>>>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way
>>>>>> to verify
>>>>>> > > > that
>>>>>> > > > > > in cluster like this:
>>>>>> > > > > > ```
>>>>>> > > > > >     // Add the following while loop before invoking
>>>>>> awaitTermination
>>>>>> > > > > >     while (true) {
>>>>>> > > > > >       println("My acc: " + myAcc.value)
>>>>>> > > > > >       Thread.sleep(5 * 1000)
>>>>>> > > > > >     }
>>>>>> > > > > >
>>>>>> > > > > >     //query.awaitTermination()
>>>>>> > > > > > ```
>>>>>> > > > > >
>>>>>> > > > > > And the accumulator value updated can be found from driver
>>>>>> stdout.
>>>>>> > > > > >
>>>>>> > > > > > --
>>>>>> > > > > > Cheers,
>>>>>> > > > > > -z
>>>>>> > > > > >
>>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>>>> > > > > > Srinivas V <srini....@gmail.com> wrote:
>>>>>> > > > > >
>>>>>> > > > > > > yes, I am using stateful structured streaming. Yes
>>>>>> similar to what
>>>>>> > > > you
>>>>>> > > > > > do.
>>>>>> > > > > > > This is in Java
>>>>>> > > > > > > I do it this way:
>>>>>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>>> > > > > > >                 .groupByKey(
>>>>>> > > > > > >                         (MapFunction<InputEventModel,
>>>>>> String>) event
>>>>>> > > > ->
>>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>>> > > > > > >                 .mapGroupsWithState(
>>>>>> > > > > > >                         new
>>>>>> > > > > > >
>>>>>> > > > > >
>>>>>> > > >
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> > > > > > > appConfig, accumulators),
>>>>>> > > > > > >
>>>>>>  Encoders.bean(ModelStateInfo.class),
>>>>>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>>>>>> > > > > > >
>>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>>> > > > > > >
>>>>>> > > > > > > StateUpdateTask contains the update method.
>>>>>> > > > > > >
>>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>>> > > > > > > mailinglist...@gmail.com> wrote:
>>>>>> > > > > > >
>>>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>>>> > > > > > > >
>>>>>> > > > > > > > Question... Are you using 'Stateful Structured
>>>>>> Streaming' in which
>>>>>> > > > > > you've
>>>>>> > > > > > > > something like this?
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>> > > > > > > >         updateAcrossEvents
>>>>>> > > > > > > >       )
>>>>>> > > > > > > >
>>>>>> > > > > > > > And updating the Accumulator inside
>>>>>> 'updateAcrossEvents'? We're
>>>>>> > > > > > experiencing this only under 'Stateful Structured
>>>>>> Streaming'. In other
>>>>>> > > > > > streaming applications it works as expected.
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>>>> srini....@gmail.com>
>>>>>> > > > > > wrote:
>>>>>> > > > > > > >
>>>>>> > > > > > > >> Yes, I am talking about Application specific
>>>>>> Accumulators.
>>>>>> > > > Actually I
>>>>>> > > > > > am
>>>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>>>> sent to
>>>>>> > > > > > Grafana. Not
>>>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>>>> “client” on
>>>>>> > > > a
>>>>>> > > > > > yarn
>>>>>> > > > > > > >> cluster(not local Mac) where I submit from master
>>>>>> node. It should
>>>>>> > > > > > work the
>>>>>> > > > > > > >> same for cluster mode as well.
>>>>>> > > > > > > >> Create accumulators like this:
>>>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>>>> sparkContext.longAccumulator(name);
>>>>>> > > > > > > >>
>>>>>> > > > > > > >>
>>>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>> > > > > > > >> mailinglist...@gmail.com> wrote:
>>>>>> > > > > > > >>
>>>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>>>>> getting
>>>>>> > > > > > computed in
>>>>>> > > > > > > >>> your code? I am talking about the Application Specific
>>>>>> > > > Accumulators.
>>>>>> > > > > > The
>>>>>> > > > > > > >>> other standard counters such as
>>>>>> > > > 'event.progress.inputRowsPerSecond'
>>>>>> > > > > > are
>>>>>> > > > > > > >>> getting populated correctly!
>>>>>> > > > > > > >>>
>>>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>>>>> srini....@gmail.com>
>>>>>> > > > > > wrote:
>>>>>> > > > > > > >>>
>>>>>> > > > > > > >>>> Hello,
>>>>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>>>>> OnQueryProgress. I use
>>>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local
>>>>>> but not on
>>>>>> > > > > > cluster.
>>>>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>>>>> Graphana, the
>>>>>> > > > > > values
>>>>>> > > > > > > >>>> are coming there.
>>>>>> > > > > > > >>>>
>>>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>> > > > > > > >>>> mailinglist...@gmail.com> wrote:
>>>>>> > > > > > > >>>>
>>>>>> > > > > > > >>>>> No this is not working even if I use
>>>>>> LongAccumulator.
>>>>>> > > > > > > >>>>>
>>>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>>>>> wezh...@outlook.com
>>>>>> > > > >
>>>>>> > > > > > wrote:
>>>>>> > > > > > > >>>>>
>>>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1],
>>>>>> the OUT type
>>>>>> > > > > > should
>>>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>>>>> implementation
>>>>>> > > > for
>>>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is
>>>>>> there any
>>>>>> > > > chance
>>>>>> > > > > > to replace
>>>>>> > > > > > > >>>>>> CollectionLongAccumulator by
>>>>>> CollectionAccumulator[2] or
>>>>>> > > > > > LongAccumulator[3]
>>>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes
>>>>>> are able to
>>>>>> > > > > > work?
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> ---
>>>>>> > > > > > > >>>>>> Cheers,
>>>>>> > > > > > > >>>>>> -z
>>>>>> > > > > > > >>>>>> [1]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>>>>>> > > > > > > >>>>>> [2]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>>>>>> > > > > > > >>>>>> [3]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> ________________________________________
>>>>>> > > > > > > >>>>>> From: Something Something <
>>>>>> mailinglist...@gmail.com>
>>>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>> > > > > > > >>>>>> To: spark-user
>>>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with
>>>>>> Structured
>>>>>> > > > Streaming
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>>>>> this
>>>>>> > > > > > functionality
>>>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time
>>>>>> on this but
>>>>>> > > > > > can't get it
>>>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>>>>> Accumulator
>>>>>> > > > > > class that
>>>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>>>>> track of
>>>>>> > > > one or
>>>>>> > > > > > more
>>>>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>>>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T,
>>>>>> Long]]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> When the job begins we register an instance of
>>>>>> this class:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>>>> "MyAccumulator")
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but
>>>>>> any help
>>>>>> > > > would be
>>>>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something
>>>>>> Something <
>>>>>> > > > > > > >>>>>> mailinglist...@gmail.com<mailto:
>>>>>> mailinglist...@gmail.com>>
>>>>>> > > > wrote:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>>>>> > > > Accumulators in
>>>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always
>>>>>> 0 when I
>>>>>> > > > try to
>>>>>> > > > > > print
>>>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>> > > > > > > >>>>>>         updateAcrossEvents
>>>>>> > > > > > > >>>>>>       )
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> The accumulators get incremented in
>>>>>> 'updateAcrossEvents'.
>>>>>> > > > I've a
>>>>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>>>>> accumulators in
>>>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>>>>> Accumulators
>>>>>> > > > are
>>>>>> > > > > > ALWAYS
>>>>>> > > > > > > >>>>>> ZERO!
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> When I added log statements in the
>>>>>> updateAcrossEvents, I
>>>>>> > > > could see
>>>>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>>>>> expected.
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster'
>>>>>> mode. In Local
>>>>>> > > > mode
>>>>>> > > > > > it
>>>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>>>>> not getting
>>>>>> > > > > > distributed
>>>>>> > > > > > > >>>>>> correctly - or something like that!
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web
>>>>>> that tell me to
>>>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here.
>>>>>> This is a
>>>>>> > > > > > 'Stateful
>>>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>>>>> 'registering' them
>>>>>> > > > in
>>>>>> > > > > > > >>>>>> SparkContext.
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> >
>>>>>> >
>>>>>> ---------------------------------------------------------------------
>>>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>> >
>>>>>>
>>>>>

Reply via email to