Giving the code below:
//accumulators is a class level variable in driver.

 sparkSession.streams().addListener(new StreamingQueryListener() {
            @Override
            public void onQueryStarted(QueryStartedEvent queryStarted) {
                logger.info("Query started: " + queryStarted.id());
            }
            @Override
            public void onQueryTerminated(QueryTerminatedEvent
queryTerminated) {
                logger.info("Query terminated: " + queryTerminated.id());
            }
            @Override
            public void onQueryProgress(QueryProgressEvent queryProgress) {

accumulators.eventsReceived(queryProgress.progress().numInputRows());
                long eventsReceived = 0;
                long eventsExpired = 0;
                long eventSentSuccess = 0;
                try {
                    eventsReceived =
accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
                    eventsExpired =
accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
                    eventSentSuccess =
accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
                } catch (MissingKeyException e) {
                    logger.error("Accumulator key not found due to
Exception {}", e.getMessage());
                }
                logger.info("Events Received:{}", eventsReceived);
                logger.info("Events State Expired:{}", eventsExpired);
                logger.info("Events Sent Success:{}", eventSentSuccess);
                logger.info("Query made progress - batchId: {}
numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
durationMs:{}" ,
                        queryProgress.progress().batchId(),
queryProgress.progress().numInputRows(),
queryProgress.progress().inputRowsPerSecond(),
                        queryProgress.progress().processedRowsPerSecond(),
queryProgress.progress().durationMs());


On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com> wrote:

> May I get how the accumulator is accessed in the method
> `onQueryProgress()`?
>
> AFAICT, the accumulator is incremented well. There is a way to verify that
> in cluster like this:
> ```
>     // Add the following while loop before invoking awaitTermination
>     while (true) {
>       println("My acc: " + myAcc.value)
>       Thread.sleep(5 * 1000)
>     }
>
>     //query.awaitTermination()
> ```
>
> And the accumulator value updated can be found from driver stdout.
>
> --
> Cheers,
> -z
>
> On Thu, 28 May 2020 17:12:48 +0530
> Srinivas V <srini....@gmail.com> wrote:
>
> > yes, I am using stateful structured streaming. Yes similar to what you
> do.
> > This is in Java
> > I do it this way:
> >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> >                 .groupByKey(
> >                         (MapFunction<InputEventModel, String>) event ->
> > event.getId(), Encoders.STRING())
> >                 .mapGroupsWithState(
> >                         new
> >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > appConfig, accumulators),
> >                         Encoders.bean(ModelStateInfo.class),
> >                         Encoders.bean(ModelUpdate.class),
> >                         GroupStateTimeout.ProcessingTimeTimeout());
> >
> > StateUpdateTask contains the update method.
> >
> > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > mailinglist...@gmail.com> wrote:
> >
> > > Yes, that's exactly how I am creating them.
> > >
> > > Question... Are you using 'Stateful Structured Streaming' in which
> you've
> > > something like this?
> > >
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > >         updateAcrossEvents
> > >       )
> > >
> > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> experiencing this only under 'Stateful Structured Streaming'. In other
> streaming applications it works as expected.
> > >
> > >
> > >
> > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com>
> wrote:
> > >
> > >> Yes, I am talking about Application specific Accumulators. Actually I
> am
> > >> getting the values printed in my driver log as well as sent to
> Grafana. Not
> > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> yarn
> > >> cluster(not local Mac) where I submit from master node. It should
> work the
> > >> same for cluster mode as well.
> > >> Create accumulators like this:
> > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > >>
> > >>
> > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > >> mailinglist...@gmail.com> wrote:
> > >>
> > >>> Hmm... how would they go to Graphana if they are not getting
> computed in
> > >>> your code? I am talking about the Application Specific Accumulators.
> The
> > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> are
> > >>> getting populated correctly!
> > >>>
> > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com>
> wrote:
> > >>>
> > >>>> Hello,
> > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> cluster.
> > >>>> But one consolation is that when I send metrics to Graphana, the
> values
> > >>>> are coming there.
> > >>>>
> > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > >>>> mailinglist...@gmail.com> wrote:
> > >>>>
> > >>>>> No this is not working even if I use LongAccumulator.
> > >>>>>
> > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezh...@outlook.com>
> wrote:
> > >>>>>
> > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> should
> > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> to replace
> > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> LongAccumulator[3]
> > >>>>>> and test if the StreamingListener and other codes are able to
> work?
> > >>>>>>
> > >>>>>> ---
> > >>>>>> Cheers,
> > >>>>>> -z
> > >>>>>> [1]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&amp;reserved=0
> > >>>>>> [2]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&amp;reserved=0
> > >>>>>> [3]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&amp;reserved=0
> > >>>>>>
> > >>>>>> ________________________________________
> > >>>>>> From: Something Something <mailinglist...@gmail.com>
> > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > >>>>>> To: spark-user
> > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> > >>>>>>
> > >>>>>> Can someone from Spark Development team tell me if this
> functionality
> > >>>>>> is supported and tested? I've spent a lot of time on this but
> can't get it
> > >>>>>> to work. Just to add more context, we've our own Accumulator
> class that
> > >>>>>> extends from AccumulatorV2. In this class we keep track of one or
> more
> > >>>>>> accumulators. Here's the definition:
> > >>>>>>
> > >>>>>>
> > >>>>>> class CollectionLongAccumulator[T]
> > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > >>>>>>
> > >>>>>> When the job begins we register an instance of this class:
> > >>>>>>
> > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > >>>>>>
> > >>>>>> Is this working under Structured Streaming?
> > >>>>>>
> > >>>>>> I will keep looking for alternate approaches but any help would be
> > >>>>>> greatly appreciated. Thanks.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
> > >>>>>>
> > >>>>>> In my structured streaming job I am updating Spark Accumulators in
> > >>>>>> the updateAcrossEvents method but they are always 0 when I try to
> print
> > >>>>>> them in my StreamingListener. Here's the code:
> > >>>>>>
> > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > >>>>>>         updateAcrossEvents
> > >>>>>>       )
> > >>>>>>
> > >>>>>>
> > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> > >>>>>> StreamingListener which writes values of the accumulators in
> > >>>>>> 'onQueryProgress' method but in this method the Accumulators are
> ALWAYS
> > >>>>>> ZERO!
> > >>>>>>
> > >>>>>> When I added log statements in the updateAcrossEvents, I could see
> > >>>>>> that these accumulators are getting incremented as expected.
> > >>>>>>
> > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
> it
> > >>>>>> works fine which implies that the Accumulators are not getting
> distributed
> > >>>>>> correctly - or something like that!
> > >>>>>>
> > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > >>>>>> perform an "Action". That's not a solution here. This is a
> 'Stateful
> > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> > >>>>>> SparkContext.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
>

Reply via email to