Great. I guess the trick is to use a separate class such as
'StateUpdateTask'. I will try that. My challenge is to convert this into
Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote:

> The following Java codes can work in my cluster environment:
> ```
>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long,
> LeadingCharCount>) (key, values, state) -> {
>                 myAcc.add(1);
>                 <...>
>                 state.update(newState);
>                 return new LeadingCharCount(key, newState);
>             },
>             Encoders.LONG(),
>             Encoders.bean(LeadingCharCount.class),
>             GroupStateTimeout.ProcessingTimeTimeout())
> ```
>
> Also works fine with my `StateUpdateTask`:
> ```
>     .mapGroupsWithState(
>             new StateUpdateTask(myAcc),
>             Encoders.LONG(),
>             Encoders.bean(LeadingCharCount.class),
>             GroupStateTimeout.ProcessingTimeTimeout());
>
> public class StateUpdateTask
>             implements MapGroupsWithStateFunction<String, String, Long,
> LeadingCharCount> {
>         private LongAccumulator myAccInTask;
>
>         public StateUpdateTask(LongAccumulator acc) {
>             this.myAccInTask = acc;
>         }
>
>         @Override
>         public LeadingCharCount call(String key, Iterator<String> values,
> GroupState<Long> state) throws Exception {
>             myAccInTask.add(1);
>             <...>
>             state.update(newState);
>             return new LeadingCharCount(key, newState);
>         }
> }
> ```
>
> --
> Cheers,
> -z
>
> On Tue, 2 Jun 2020 10:28:36 +0800
> ZHANG Wei <wezh...@outlook.com> wrote:
>
> > Yes, verified on the cluster with 5 executors.
> >
> > --
> > Cheers,
> > -z
> >
> > On Fri, 29 May 2020 11:16:12 -0700
> > Something Something <mailinglist...@gmail.com> wrote:
> >
> > > Did you try this on the Cluster? Note: This works just fine under
> 'Local'
> > > mode.
> > >
> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com> wrote:
> > >
> > > > I can't reproduce the issue with my simple code:
> > > > ```scala
> > > >     spark.streams.addListener(new StreamingQueryListener {
> > > >       override def onQueryProgress(event:
> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > > >         println(event.progress.id + " is on progress")
> > > >         println(s"My accu is ${myAcc.value} on query progress")
> > > >       }
> > > >         ...
> > > >     })
> > > >
> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > > GroupState[Long]): ... = {
> > > >       myAcc.add(1)
> > > >       println(s">>> key: $key => state: ${state}")
> > > >         ...
> > > >     }
> > > >
> > > >     val wordCounts = words
> > > >       .groupByKey(v => ...)
> > > >       .mapGroupsWithState(timeoutConf =
> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > > >
> > > >     val query = wordCounts.writeStream
> > > >       .outputMode(OutputMode.Update)
> > > >         ...
> > > > ```
> > > >
> > > > I'm wondering if there were any errors can be found from driver
> logs? The
> > > > micro-batch
> > > > exceptions won't terminate the streaming job running.
> > > >
> > > > For the following code, we have to make sure that `StateUpdateTask`
> is
> > > > started:
> > > > >                 .mapGroupsWithState(
> > > > >                         new
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > appConfig, accumulators),
> > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > >                         Encoders.bean(ModelUpdate.class),
> > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > --
> > > > Cheers,
> > > > -z
> > > >
> > > > On Thu, 28 May 2020 19:59:31 +0530
> > > > Srinivas V <srini....@gmail.com> wrote:
> > > >
> > > > > Giving the code below:
> > > > > //accumulators is a class level variable in driver.
> > > > >
> > > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > > >             @Override
> > > > >             public void onQueryStarted(QueryStartedEvent
> queryStarted) {
> > > > >                 logger.info("Query started: " +
> queryStarted.id());
> > > > >             }
> > > > >             @Override
> > > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > > queryTerminated) {
> > > > >                 logger.info("Query terminated: " +
> > > > queryTerminated.id());
> > > > >             }
> > > > >             @Override
> > > > >             public void onQueryProgress(QueryProgressEvent
> > > > queryProgress) {
> > > > >
> > > > >
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > > >                 long eventsReceived = 0;
> > > > >                 long eventsExpired = 0;
> > > > >                 long eventSentSuccess = 0;
> > > > >                 try {
> > > > >                     eventsReceived =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > > >                     eventsExpired =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > > >                     eventSentSuccess =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > > >                 } catch (MissingKeyException e) {
> > > > >                     logger.error("Accumulator key not found due to
> > > > > Exception {}", e.getMessage());
> > > > >                 }
> > > > >                 logger.info("Events Received:{}", eventsReceived);
> > > > >                 logger.info("Events State Expired:{}",
> eventsExpired);
> > > > >                 logger.info("Events Sent Success:{}",
> eventSentSuccess);
> > > > >                 logger.info("Query made progress - batchId: {}
> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > > durationMs:{}" ,
> > > > >                         queryProgress.progress().batchId(),
> > > > > queryProgress.progress().numInputRows(),
> > > > > queryProgress.progress().inputRowsPerSecond(),
> > > > >
> > > >  queryProgress.progress().processedRowsPerSecond(),
> > > > > queryProgress.progress().durationMs());
> > > > >
> > > > >
> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com>
> wrote:
> > > > >
> > > > > > May I get how the accumulator is accessed in the method
> > > > > > `onQueryProgress()`?
> > > > > >
> > > > > > AFAICT, the accumulator is incremented well. There is a way to
> verify
> > > > that
> > > > > > in cluster like this:
> > > > > > ```
> > > > > >     // Add the following while loop before invoking
> awaitTermination
> > > > > >     while (true) {
> > > > > >       println("My acc: " + myAcc.value)
> > > > > >       Thread.sleep(5 * 1000)
> > > > > >     }
> > > > > >
> > > > > >     //query.awaitTermination()
> > > > > > ```
> > > > > >
> > > > > > And the accumulator value updated can be found from driver
> stdout.
> > > > > >
> > > > > > --
> > > > > > Cheers,
> > > > > > -z
> > > > > >
> > > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > > Srinivas V <srini....@gmail.com> wrote:
> > > > > >
> > > > > > > yes, I am using stateful structured streaming. Yes similar to
> what
> > > > you
> > > > > > do.
> > > > > > > This is in Java
> > > > > > > I do it this way:
> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > > >                 .groupByKey(
> > > > > > >                         (MapFunction<InputEventModel, String>)
> event
> > > > ->
> > > > > > > event.getId(), Encoders.STRING())
> > > > > > >                 .mapGroupsWithState(
> > > > > > >                         new
> > > > > > >
> > > > > >
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > > appConfig, accumulators),
> > > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > > >
>  GroupStateTimeout.ProcessingTimeTimeout());
> > > > > > >
> > > > > > > StateUpdateTask contains the update method.
> > > > > > >
> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > > mailinglist...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Yes, that's exactly how I am creating them.
> > > > > > > >
> > > > > > > > Question... Are you using 'Stateful Structured Streaming' in
> which
> > > > > > you've
> > > > > > > > something like this?
> > > > > > > >
> > > > > > > >
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > > >         updateAcrossEvents
> > > > > > > >       )
> > > > > > > >
> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
> We're
> > > > > > experiencing this only under 'Stateful Structured Streaming'. In
> other
> > > > > > streaming applications it works as expected.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
> srini....@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > > Actually I
> > > > > > am
> > > > > > > >> getting the values printed in my driver log as well as sent
> to
> > > > > > Grafana. Not
> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
> “client” on
> > > > a
> > > > > > yarn
> > > > > > > >> cluster(not local Mac) where I submit from master node. It
> should
> > > > > > work the
> > > > > > > >> same for cluster mode as well.
> > > > > > > >> Create accumulators like this:
> > > > > > > >> AccumulatorV2 accumulator =
> sparkContext.longAccumulator(name);
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > > >> mailinglist...@gmail.com> wrote:
> > > > > > > >>
> > > > > > > >>> Hmm... how would they go to Graphana if they are not
> getting
> > > > > > computed in
> > > > > > > >>> your code? I am talking about the Application Specific
> > > > Accumulators.
> > > > > > The
> > > > > > > >>> other standard counters such as
> > > > 'event.progress.inputRowsPerSecond'
> > > > > > are
> > > > > > > >>> getting populated correctly!
> > > > > > > >>>
> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
> srini....@gmail.com>
> > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>>> Hello,
> > > > > > > >>>> Even for me it comes as 0 when I print in
> OnQueryProgress. I use
> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
> not on
> > > > > > cluster.
> > > > > > > >>>> But one consolation is that when I send metrics to
> Graphana, the
> > > > > > values
> > > > > > > >>>> are coming there.
> > > > > > > >>>>
> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > > >>>> mailinglist...@gmail.com> wrote:
> > > > > > > >>>>
> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > > >>>>>
> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
> wezh...@outlook.com
> > > > >
> > > > > > wrote:
> > > > > > > >>>>>
> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
> OUT type
> > > > > > should
> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
> implementation
> > > > for
> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there
> any
> > > > chance
> > > > > > to replace
> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > > LongAccumulator[3]
> > > > > > > >>>>>> and test if the StreamingListener and other codes are
> able to
> > > > > > work?
> > > > > > > >>>>>>
> > > > > > > >>>>>> ---
> > > > > > > >>>>>> Cheers,
> > > > > > > >>>>>> -z
> > > > > > > >>>>>> [1]
> > > > > > > >>>>>>
> > > > > >
> > > >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > > >>>>>> [2]
> > > > > > > >>>>>>
> > > > > >
> > > >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > > >>>>>> [3]
> > > > > > > >>>>>>
> > > > > >
> > > >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > > >>>>>>
> > > > > > > >>>>>> ________________________________________
> > > > > > > >>>>>> From: Something Something <mailinglist...@gmail.com>
> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > > >>>>>> To: spark-user
> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > > Streaming
> > > > > > > >>>>>>
> > > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > > functionality
> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
> this but
> > > > > > can't get it
> > > > > > > >>>>>> to work. Just to add more context, we've our own
> Accumulator
> > > > > > class that
> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track
> of
> > > > one or
> > > > > > more
> > > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > > >>>>>>
> > > > > > > >>>>>> When the job begins we register an instance of this
> class:
> > > > > > > >>>>>>
> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
> "MyAccumulator")
> > > > > > > >>>>>>
> > > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > > >>>>>>
> > > > > > > >>>>>> I will keep looking for alternate approaches but any
> help
> > > > would be
> > > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > > >>>>>> mailinglist...@gmail.com<mailto:
> mailinglist...@gmail.com>>
> > > > wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > > Accumulators in
> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
> when I
> > > > try to
> > > > > > print
> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > > >>>>>>         updateAcrossEvents
> > > > > > > >>>>>>       )
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> The accumulators get incremented in
> 'updateAcrossEvents'.
> > > > I've a
> > > > > > > >>>>>> StreamingListener which writes values of the
> accumulators in
> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
> Accumulators
> > > > are
> > > > > > ALWAYS
> > > > > > > >>>>>> ZERO!
> > > > > > > >>>>>>
> > > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > > could see
> > > > > > > >>>>>> that these accumulators are getting incremented as
> expected.
> > > > > > > >>>>>>
> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In
> Local
> > > > mode
> > > > > > it
> > > > > > > >>>>>> works fine which implies that the Accumulators are not
> getting
> > > > > > distributed
> > > > > > > >>>>>> correctly - or something like that!
> > > > > > > >>>>>>
> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
> tell me to
> > > > > > > >>>>>> perform an "Action". That's not a solution here. This
> is a
> > > > > > 'Stateful
> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering'
> them
> > > > in
> > > > > > > >>>>>> SparkContext.
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > >
> > > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>

Reply via email to