You don’t need to have a separate class. I created that as it has lot of
code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent):
Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state:
GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf =
GroupStateTimeout.ProcessingTimeTimeout)(func
= mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)


On Mon, Jun 8, 2020 at 11:14 AM Something Something <
mailinglist...@gmail.com> wrote:

> Great. I guess the trick is to use a separate class such as
> 'StateUpdateTask'. I will try that. My challenge is to convert this into
> Scala. Will try it out & revert. Thanks for the tips.
>
> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote:
>
>> The following Java codes can work in my cluster environment:
>> ```
>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long,
>> LeadingCharCount>) (key, values, state) -> {
>>                 myAcc.add(1);
>>                 <...>
>>                 state.update(newState);
>>                 return new LeadingCharCount(key, newState);
>>             },
>>             Encoders.LONG(),
>>             Encoders.bean(LeadingCharCount.class),
>>             GroupStateTimeout.ProcessingTimeTimeout())
>> ```
>>
>> Also works fine with my `StateUpdateTask`:
>> ```
>>     .mapGroupsWithState(
>>             new StateUpdateTask(myAcc),
>>             Encoders.LONG(),
>>             Encoders.bean(LeadingCharCount.class),
>>             GroupStateTimeout.ProcessingTimeTimeout());
>>
>> public class StateUpdateTask
>>             implements MapGroupsWithStateFunction<String, String, Long,
>> LeadingCharCount> {
>>         private LongAccumulator myAccInTask;
>>
>>         public StateUpdateTask(LongAccumulator acc) {
>>             this.myAccInTask = acc;
>>         }
>>
>>         @Override
>>         public LeadingCharCount call(String key, Iterator<String> values,
>> GroupState<Long> state) throws Exception {
>>             myAccInTask.add(1);
>>             <...>
>>             state.update(newState);
>>             return new LeadingCharCount(key, newState);
>>         }
>> }
>> ```
>>
>> --
>> Cheers,
>> -z
>>
>> On Tue, 2 Jun 2020 10:28:36 +0800
>> ZHANG Wei <wezh...@outlook.com> wrote:
>>
>> > Yes, verified on the cluster with 5 executors.
>> >
>> > --
>> > Cheers,
>> > -z
>> >
>> > On Fri, 29 May 2020 11:16:12 -0700
>> > Something Something <mailinglist...@gmail.com> wrote:
>> >
>> > > Did you try this on the Cluster? Note: This works just fine under
>> 'Local'
>> > > mode.
>> > >
>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com>
>> wrote:
>> > >
>> > > > I can't reproduce the issue with my simple code:
>> > > > ```scala
>> > > >     spark.streams.addListener(new StreamingQueryListener {
>> > > >       override def onQueryProgress(event:
>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>> > > >         println(event.progress.id + " is on progress")
>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>> > > >       }
>> > > >         ...
>> > > >     })
>> > > >
>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>> > > > GroupState[Long]): ... = {
>> > > >       myAcc.add(1)
>> > > >       println(s">>> key: $key => state: ${state}")
>> > > >         ...
>> > > >     }
>> > > >
>> > > >     val wordCounts = words
>> > > >       .groupByKey(v => ...)
>> > > >       .mapGroupsWithState(timeoutConf =
>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>> > > >
>> > > >     val query = wordCounts.writeStream
>> > > >       .outputMode(OutputMode.Update)
>> > > >         ...
>> > > > ```
>> > > >
>> > > > I'm wondering if there were any errors can be found from driver
>> logs? The
>> > > > micro-batch
>> > > > exceptions won't terminate the streaming job running.
>> > > >
>> > > > For the following code, we have to make sure that `StateUpdateTask`
>> is
>> > > > started:
>> > > > >                 .mapGroupsWithState(
>> > > > >                         new
>> > > >
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> > > > > appConfig, accumulators),
>> > > > >                         Encoders.bean(ModelStateInfo.class),
>> > > > >                         Encoders.bean(ModelUpdate.class),
>> > > > >
>>  GroupStateTimeout.ProcessingTimeTimeout());
>> > > >
>> > > > --
>> > > > Cheers,
>> > > > -z
>> > > >
>> > > > On Thu, 28 May 2020 19:59:31 +0530
>> > > > Srinivas V <srini....@gmail.com> wrote:
>> > > >
>> > > > > Giving the code below:
>> > > > > //accumulators is a class level variable in driver.
>> > > > >
>> > > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
>> > > > >             @Override
>> > > > >             public void onQueryStarted(QueryStartedEvent
>> queryStarted) {
>> > > > >                 logger.info("Query started: " +
>> queryStarted.id());
>> > > > >             }
>> > > > >             @Override
>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>> > > > > queryTerminated) {
>> > > > >                 logger.info("Query terminated: " +
>> > > > queryTerminated.id());
>> > > > >             }
>> > > > >             @Override
>> > > > >             public void onQueryProgress(QueryProgressEvent
>> > > > queryProgress) {
>> > > > >
>> > > > >
>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>> > > > >                 long eventsReceived = 0;
>> > > > >                 long eventsExpired = 0;
>> > > > >                 long eventSentSuccess = 0;
>> > > > >                 try {
>> > > > >                     eventsReceived =
>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>> > > > >                     eventsExpired =
>> > > > >
>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>> > > > >                     eventSentSuccess =
>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>> > > > >                 } catch (MissingKeyException e) {
>> > > > >                     logger.error("Accumulator key not found due to
>> > > > > Exception {}", e.getMessage());
>> > > > >                 }
>> > > > >                 logger.info("Events Received:{}",
>> eventsReceived);
>> > > > >                 logger.info("Events State Expired:{}",
>> eventsExpired);
>> > > > >                 logger.info("Events Sent Success:{}",
>> eventSentSuccess);
>> > > > >                 logger.info("Query made progress - batchId: {}
>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>> > > > > durationMs:{}" ,
>> > > > >                         queryProgress.progress().batchId(),
>> > > > > queryProgress.progress().numInputRows(),
>> > > > > queryProgress.progress().inputRowsPerSecond(),
>> > > > >
>> > > >  queryProgress.progress().processedRowsPerSecond(),
>> > > > > queryProgress.progress().durationMs());
>> > > > >
>> > > > >
>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com>
>> wrote:
>> > > > >
>> > > > > > May I get how the accumulator is accessed in the method
>> > > > > > `onQueryProgress()`?
>> > > > > >
>> > > > > > AFAICT, the accumulator is incremented well. There is a way to
>> verify
>> > > > that
>> > > > > > in cluster like this:
>> > > > > > ```
>> > > > > >     // Add the following while loop before invoking
>> awaitTermination
>> > > > > >     while (true) {
>> > > > > >       println("My acc: " + myAcc.value)
>> > > > > >       Thread.sleep(5 * 1000)
>> > > > > >     }
>> > > > > >
>> > > > > >     //query.awaitTermination()
>> > > > > > ```
>> > > > > >
>> > > > > > And the accumulator value updated can be found from driver
>> stdout.
>> > > > > >
>> > > > > > --
>> > > > > > Cheers,
>> > > > > > -z
>> > > > > >
>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>> > > > > > Srinivas V <srini....@gmail.com> wrote:
>> > > > > >
>> > > > > > > yes, I am using stateful structured streaming. Yes similar to
>> what
>> > > > you
>> > > > > > do.
>> > > > > > > This is in Java
>> > > > > > > I do it this way:
>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>> > > > > > >                 .groupByKey(
>> > > > > > >                         (MapFunction<InputEventModel,
>> String>) event
>> > > > ->
>> > > > > > > event.getId(), Encoders.STRING())
>> > > > > > >                 .mapGroupsWithState(
>> > > > > > >                         new
>> > > > > > >
>> > > > > >
>> > > >
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> > > > > > > appConfig, accumulators),
>> > > > > > >                         Encoders.bean(ModelStateInfo.class),
>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>> > > > > > >
>>  GroupStateTimeout.ProcessingTimeTimeout());
>> > > > > > >
>> > > > > > > StateUpdateTask contains the update method.
>> > > > > > >
>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>> > > > > > > mailinglist...@gmail.com> wrote:
>> > > > > > >
>> > > > > > > > Yes, that's exactly how I am creating them.
>> > > > > > > >
>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>> in which
>> > > > > > you've
>> > > > > > > > something like this?
>> > > > > > > >
>> > > > > > > >
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> > > > > > > >         updateAcrossEvents
>> > > > > > > >       )
>> > > > > > > >
>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>> We're
>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>> In other
>> > > > > > streaming applications it works as expected.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>> srini....@gmail.com>
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > >> Yes, I am talking about Application specific Accumulators.
>> > > > Actually I
>> > > > > > am
>> > > > > > > >> getting the values printed in my driver log as well as
>> sent to
>> > > > > > Grafana. Not
>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>> “client” on
>> > > > a
>> > > > > > yarn
>> > > > > > > >> cluster(not local Mac) where I submit from master node. It
>> should
>> > > > > > work the
>> > > > > > > >> same for cluster mode as well.
>> > > > > > > >> Create accumulators like this:
>> > > > > > > >> AccumulatorV2 accumulator =
>> sparkContext.longAccumulator(name);
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> > > > > > > >> mailinglist...@gmail.com> wrote:
>> > > > > > > >>
>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>> getting
>> > > > > > computed in
>> > > > > > > >>> your code? I am talking about the Application Specific
>> > > > Accumulators.
>> > > > > > The
>> > > > > > > >>> other standard counters such as
>> > > > 'event.progress.inputRowsPerSecond'
>> > > > > > are
>> > > > > > > >>> getting populated correctly!
>> > > > > > > >>>
>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>> srini....@gmail.com>
>> > > > > > wrote:
>> > > > > > > >>>
>> > > > > > > >>>> Hello,
>> > > > > > > >>>> Even for me it comes as 0 when I print in
>> OnQueryProgress. I use
>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
>> not on
>> > > > > > cluster.
>> > > > > > > >>>> But one consolation is that when I send metrics to
>> Graphana, the
>> > > > > > values
>> > > > > > > >>>> are coming there.
>> > > > > > > >>>>
>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> > > > > > > >>>> mailinglist...@gmail.com> wrote:
>> > > > > > > >>>>
>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>> > > > > > > >>>>>
>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>> wezh...@outlook.com
>> > > > >
>> > > > > > wrote:
>> > > > > > > >>>>>
>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>> OUT type
>> > > > > > should
>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>> implementation
>> > > > for
>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there
>> any
>> > > > chance
>> > > > > > to replace
>> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2]
>> or
>> > > > > > LongAccumulator[3]
>> > > > > > > >>>>>> and test if the StreamingListener and other codes are
>> able to
>> > > > > > work?
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> ---
>> > > > > > > >>>>>> Cheers,
>> > > > > > > >>>>>> -z
>> > > > > > > >>>>>> [1]
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>> > > > > > > >>>>>> [2]
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>> > > > > > > >>>>>> [3]
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> ________________________________________
>> > > > > > > >>>>>> From: Something Something <mailinglist...@gmail.com>
>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>> > > > > > > >>>>>> To: spark-user
>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>> > > > Streaming
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Can someone from Spark Development team tell me if this
>> > > > > > functionality
>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>> this but
>> > > > > > can't get it
>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>> Accumulator
>> > > > > > class that
>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>> track of
>> > > > one or
>> > > > > > more
>> > > > > > > >>>>>> accumulators. Here's the definition:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> When the job begins we register an instance of this
>> class:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>> "MyAccumulator")
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Is this working under Structured Streaming?
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>> help
>> > > > would be
>> > > > > > > >>>>>> greatly appreciated. Thanks.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> > > > > > > >>>>>> mailinglist...@gmail.com<mailto:
>> mailinglist...@gmail.com>>
>> > > > wrote:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>> > > > Accumulators in
>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>> when I
>> > > > try to
>> > > > > > print
>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> > > > > > > >>>>>>         updateAcrossEvents
>> > > > > > > >>>>>>       )
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> The accumulators get incremented in
>> 'updateAcrossEvents'.
>> > > > I've a
>> > > > > > > >>>>>> StreamingListener which writes values of the
>> accumulators in
>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>> Accumulators
>> > > > are
>> > > > > > ALWAYS
>> > > > > > > >>>>>> ZERO!
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> When I added log statements in the updateAcrossEvents,
>> I
>> > > > could see
>> > > > > > > >>>>>> that these accumulators are getting incremented as
>> expected.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In
>> Local
>> > > > mode
>> > > > > > it
>> > > > > > > >>>>>> works fine which implies that the Accumulators are not
>> getting
>> > > > > > distributed
>> > > > > > > >>>>>> correctly - or something like that!
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>> tell me to
>> > > > > > > >>>>>> perform an "Action". That's not a solution here. This
>> is a
>> > > > > > 'Stateful
>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>> 'registering' them
>> > > > in
>> > > > > > > >>>>>> SparkContext.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>

Reply via email to