What is scary is this interface is marked as "experimental" @Experimental @InterfaceStability.Evolving public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { R call(K key, Iterator<V> values, GroupState<S> state) throws Exception; }
On Mon, Jun 8, 2020 at 11:54 AM Something Something < mailinglist...@gmail.com> wrote: > Right, this is exactly how I've it right now. Problem is in the cluster > mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you > will see what I mean. > > I think how Zhang is using will work. Will try & revert. > > On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini....@gmail.com> wrote: > >> >> You don’t need to have a separate class. I created that as it has lot of >> code and logic in my case. >> For you to quickly test you can use Zhang’s Scala code in this chain. >> Pasting it below for your quick reference: >> >> ```scala >> spark.streams.addListener(new StreamingQueryListener { >> override def onQueryProgress(event: >> StreamingQueryListener.QueryProgressEvent): >> Unit = { >> println(event.progress.id + " is on progress") >> println(s"My accu is ${myAcc.value} on query progress") >> } >> ... >> }) >> >> def mappingFunc(key: Long, values: Iterator[String], state: >> GroupState[Long]): ... = { >> myAcc.add(1) >> println(s">>> key: $key => state: ${state}") >> ... >> } >> >> val wordCounts = words >> .groupByKey(v => ...) >> .mapGroupsWithState(timeoutConf = >> GroupStateTimeout.ProcessingTimeTimeout)(func >> = mappingFunc) >> >> val query = wordCounts.writeStream >> .outputMode(OutputMode.Update) >> >> >> On Mon, Jun 8, 2020 at 11:14 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Great. I guess the trick is to use a separate class such as >>> 'StateUpdateTask'. I will try that. My challenge is to convert this into >>> Scala. Will try it out & revert. Thanks for the tips. >>> >>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezh...@outlook.com> wrote: >>> >>>> The following Java codes can work in my cluster environment: >>>> ``` >>>> .mapGroupsWithState((MapGroupsWithStateFunction<String, String, >>>> Long, LeadingCharCount>) (key, values, state) -> { >>>> myAcc.add(1); >>>> <...> >>>> state.update(newState); >>>> return new LeadingCharCount(key, newState); >>>> }, >>>> Encoders.LONG(), >>>> Encoders.bean(LeadingCharCount.class), >>>> GroupStateTimeout.ProcessingTimeTimeout()) >>>> ``` >>>> >>>> Also works fine with my `StateUpdateTask`: >>>> ``` >>>> .mapGroupsWithState( >>>> new StateUpdateTask(myAcc), >>>> Encoders.LONG(), >>>> Encoders.bean(LeadingCharCount.class), >>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>> >>>> public class StateUpdateTask >>>> implements MapGroupsWithStateFunction<String, String, Long, >>>> LeadingCharCount> { >>>> private LongAccumulator myAccInTask; >>>> >>>> public StateUpdateTask(LongAccumulator acc) { >>>> this.myAccInTask = acc; >>>> } >>>> >>>> @Override >>>> public LeadingCharCount call(String key, Iterator<String> >>>> values, GroupState<Long> state) throws Exception { >>>> myAccInTask.add(1); >>>> <...> >>>> state.update(newState); >>>> return new LeadingCharCount(key, newState); >>>> } >>>> } >>>> ``` >>>> >>>> -- >>>> Cheers, >>>> -z >>>> >>>> On Tue, 2 Jun 2020 10:28:36 +0800 >>>> ZHANG Wei <wezh...@outlook.com> wrote: >>>> >>>> > Yes, verified on the cluster with 5 executors. >>>> > >>>> > -- >>>> > Cheers, >>>> > -z >>>> > >>>> > On Fri, 29 May 2020 11:16:12 -0700 >>>> > Something Something <mailinglist...@gmail.com> wrote: >>>> > >>>> > > Did you try this on the Cluster? Note: This works just fine under >>>> 'Local' >>>> > > mode. >>>> > > >>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezh...@outlook.com> >>>> wrote: >>>> > > >>>> > > > I can't reproduce the issue with my simple code: >>>> > > > ```scala >>>> > > > spark.streams.addListener(new StreamingQueryListener { >>>> > > > override def onQueryProgress(event: >>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = { >>>> > > > println(event.progress.id + " is on progress") >>>> > > > println(s"My accu is ${myAcc.value} on query progress") >>>> > > > } >>>> > > > ... >>>> > > > }) >>>> > > > >>>> > > > def mappingFunc(key: Long, values: Iterator[String], state: >>>> > > > GroupState[Long]): ... = { >>>> > > > myAcc.add(1) >>>> > > > println(s">>> key: $key => state: ${state}") >>>> > > > ... >>>> > > > } >>>> > > > >>>> > > > val wordCounts = words >>>> > > > .groupByKey(v => ...) >>>> > > > .mapGroupsWithState(timeoutConf = >>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) >>>> > > > >>>> > > > val query = wordCounts.writeStream >>>> > > > .outputMode(OutputMode.Update) >>>> > > > ... >>>> > > > ``` >>>> > > > >>>> > > > I'm wondering if there were any errors can be found from driver >>>> logs? The >>>> > > > micro-batch >>>> > > > exceptions won't terminate the streaming job running. >>>> > > > >>>> > > > For the following code, we have to make sure that >>>> `StateUpdateTask` is >>>> > > > started: >>>> > > > > .mapGroupsWithState( >>>> > > > > new >>>> > > > >>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>> > > > > appConfig, accumulators), >>>> > > > > Encoders.bean(ModelStateInfo.class), >>>> > > > > Encoders.bean(ModelUpdate.class), >>>> > > > > >>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>> > > > >>>> > > > -- >>>> > > > Cheers, >>>> > > > -z >>>> > > > >>>> > > > On Thu, 28 May 2020 19:59:31 +0530 >>>> > > > Srinivas V <srini....@gmail.com> wrote: >>>> > > > >>>> > > > > Giving the code below: >>>> > > > > //accumulators is a class level variable in driver. >>>> > > > > >>>> > > > > sparkSession.streams().addListener(new >>>> StreamingQueryListener() { >>>> > > > > @Override >>>> > > > > public void onQueryStarted(QueryStartedEvent >>>> queryStarted) { >>>> > > > > logger.info("Query started: " + >>>> queryStarted.id()); >>>> > > > > } >>>> > > > > @Override >>>> > > > > public void onQueryTerminated(QueryTerminatedEvent >>>> > > > > queryTerminated) { >>>> > > > > logger.info("Query terminated: " + >>>> > > > queryTerminated.id()); >>>> > > > > } >>>> > > > > @Override >>>> > > > > public void onQueryProgress(QueryProgressEvent >>>> > > > queryProgress) { >>>> > > > > >>>> > > > > >>>> accumulators.eventsReceived(queryProgress.progress().numInputRows()); >>>> > > > > long eventsReceived = 0; >>>> > > > > long eventsExpired = 0; >>>> > > > > long eventSentSuccess = 0; >>>> > > > > try { >>>> > > > > eventsReceived = >>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED); >>>> > > > > eventsExpired = >>>> > > > > >>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED); >>>> > > > > eventSentSuccess = >>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT); >>>> > > > > } catch (MissingKeyException e) { >>>> > > > > logger.error("Accumulator key not found due >>>> to >>>> > > > > Exception {}", e.getMessage()); >>>> > > > > } >>>> > > > > logger.info("Events Received:{}", >>>> eventsReceived); >>>> > > > > logger.info("Events State Expired:{}", >>>> eventsExpired); >>>> > > > > logger.info("Events Sent Success:{}", >>>> eventSentSuccess); >>>> > > > > logger.info("Query made progress - batchId: {} >>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{} >>>> > > > > durationMs:{}" , >>>> > > > > queryProgress.progress().batchId(), >>>> > > > > queryProgress.progress().numInputRows(), >>>> > > > > queryProgress.progress().inputRowsPerSecond(), >>>> > > > > >>>> > > > queryProgress.progress().processedRowsPerSecond(), >>>> > > > > queryProgress.progress().durationMs()); >>>> > > > > >>>> > > > > >>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezh...@outlook.com> >>>> wrote: >>>> > > > > >>>> > > > > > May I get how the accumulator is accessed in the method >>>> > > > > > `onQueryProgress()`? >>>> > > > > > >>>> > > > > > AFAICT, the accumulator is incremented well. There is a way >>>> to verify >>>> > > > that >>>> > > > > > in cluster like this: >>>> > > > > > ``` >>>> > > > > > // Add the following while loop before invoking >>>> awaitTermination >>>> > > > > > while (true) { >>>> > > > > > println("My acc: " + myAcc.value) >>>> > > > > > Thread.sleep(5 * 1000) >>>> > > > > > } >>>> > > > > > >>>> > > > > > //query.awaitTermination() >>>> > > > > > ``` >>>> > > > > > >>>> > > > > > And the accumulator value updated can be found from driver >>>> stdout. >>>> > > > > > >>>> > > > > > -- >>>> > > > > > Cheers, >>>> > > > > > -z >>>> > > > > > >>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530 >>>> > > > > > Srinivas V <srini....@gmail.com> wrote: >>>> > > > > > >>>> > > > > > > yes, I am using stateful structured streaming. Yes similar >>>> to what >>>> > > > you >>>> > > > > > do. >>>> > > > > > > This is in Java >>>> > > > > > > I do it this way: >>>> > > > > > > Dataset<ModelUpdate> productUpdates = watermarkedDS >>>> > > > > > > .groupByKey( >>>> > > > > > > (MapFunction<InputEventModel, >>>> String>) event >>>> > > > -> >>>> > > > > > > event.getId(), Encoders.STRING()) >>>> > > > > > > .mapGroupsWithState( >>>> > > > > > > new >>>> > > > > > > >>>> > > > > > >>>> > > > >>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>> > > > > > > appConfig, accumulators), >>>> > > > > > > Encoders.bean(ModelStateInfo.class), >>>> > > > > > > Encoders.bean(ModelUpdate.class), >>>> > > > > > > >>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>> > > > > > > >>>> > > > > > > StateUpdateTask contains the update method. >>>> > > > > > > >>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < >>>> > > > > > > mailinglist...@gmail.com> wrote: >>>> > > > > > > >>>> > > > > > > > Yes, that's exactly how I am creating them. >>>> > > > > > > > >>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming' >>>> in which >>>> > > > > > you've >>>> > > > > > > > something like this? >>>> > > > > > > > >>>> > > > > > > > >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> > > > > > > > updateAcrossEvents >>>> > > > > > > > ) >>>> > > > > > > > >>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? >>>> We're >>>> > > > > > experiencing this only under 'Stateful Structured Streaming'. >>>> In other >>>> > > > > > streaming applications it works as expected. >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V < >>>> srini....@gmail.com> >>>> > > > > > wrote: >>>> > > > > > > > >>>> > > > > > > >> Yes, I am talking about Application specific >>>> Accumulators. >>>> > > > Actually I >>>> > > > > > am >>>> > > > > > > >> getting the values printed in my driver log as well as >>>> sent to >>>> > > > > > Grafana. Not >>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is >>>> “client” on >>>> > > > a >>>> > > > > > yarn >>>> > > > > > > >> cluster(not local Mac) where I submit from master node. >>>> It should >>>> > > > > > work the >>>> > > > > > > >> same for cluster mode as well. >>>> > > > > > > >> Create accumulators like this: >>>> > > > > > > >> AccumulatorV2 accumulator = >>>> sparkContext.longAccumulator(name); >>>> > > > > > > >> >>>> > > > > > > >> >>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>> > > > > > > >> mailinglist...@gmail.com> wrote: >>>> > > > > > > >> >>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not >>>> getting >>>> > > > > > computed in >>>> > > > > > > >>> your code? I am talking about the Application Specific >>>> > > > Accumulators. >>>> > > > > > The >>>> > > > > > > >>> other standard counters such as >>>> > > > 'event.progress.inputRowsPerSecond' >>>> > > > > > are >>>> > > > > > > >>> getting populated correctly! >>>> > > > > > > >>> >>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V < >>>> srini....@gmail.com> >>>> > > > > > wrote: >>>> > > > > > > >>> >>>> > > > > > > >>>> Hello, >>>> > > > > > > >>>> Even for me it comes as 0 when I print in >>>> OnQueryProgress. I use >>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local >>>> but not on >>>> > > > > > cluster. >>>> > > > > > > >>>> But one consolation is that when I send metrics to >>>> Graphana, the >>>> > > > > > values >>>> > > > > > > >>>> are coming there. >>>> > > > > > > >>>> >>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>> > > > > > > >>>> mailinglist...@gmail.com> wrote: >>>> > > > > > > >>>> >>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator. >>>> > > > > > > >>>>> >>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei < >>>> wezh...@outlook.com >>>> > > > > >>>> > > > > > wrote: >>>> > > > > > > >>>>> >>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the >>>> OUT type >>>> > > > > > should >>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the >>>> implementation >>>> > > > for >>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is >>>> there any >>>> > > > chance >>>> > > > > > to replace >>>> > > > > > > >>>>>> CollectionLongAccumulator by >>>> CollectionAccumulator[2] or >>>> > > > > > LongAccumulator[3] >>>> > > > > > > >>>>>> and test if the StreamingListener and other codes >>>> are able to >>>> > > > > > work? >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> --- >>>> > > > > > > >>>>>> Cheers, >>>> > > > > > > >>>>>> -z >>>> > > > > > > >>>>>> [1] >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&reserved=0 >>>> > > > > > > >>>>>> [2] >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&reserved=0 >>>> > > > > > > >>>>>> [3] >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&reserved=0 >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> ________________________________________ >>>> > > > > > > >>>>>> From: Something Something <mailinglist...@gmail.com> >>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38 >>>> > > > > > > >>>>>> To: spark-user >>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured >>>> > > > Streaming >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if >>>> this >>>> > > > > > functionality >>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on >>>> this but >>>> > > > > > can't get it >>>> > > > > > > >>>>>> to work. Just to add more context, we've our own >>>> Accumulator >>>> > > > > > class that >>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep >>>> track of >>>> > > > one or >>>> > > > > > more >>>> > > > > > > >>>>>> accumulators. Here's the definition: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> class CollectionLongAccumulator[T] >>>> > > > > > > >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> When the job begins we register an instance of this >>>> class: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator, >>>> "MyAccumulator") >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> Is this working under Structured Streaming? >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any >>>> help >>>> > > > would be >>>> > > > > > > >>>>>> greatly appreciated. Thanks. >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>> > > > > > > >>>>>> mailinglist...@gmail.com<mailto: >>>> mailinglist...@gmail.com>> >>>> > > > wrote: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark >>>> > > > Accumulators in >>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0 >>>> when I >>>> > > > try to >>>> > > > > > print >>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code: >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> > > > > > > >>>>>> updateAcrossEvents >>>> > > > > > > >>>>>> ) >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> The accumulators get incremented in >>>> 'updateAcrossEvents'. >>>> > > > I've a >>>> > > > > > > >>>>>> StreamingListener which writes values of the >>>> accumulators in >>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the >>>> Accumulators >>>> > > > are >>>> > > > > > ALWAYS >>>> > > > > > > >>>>>> ZERO! >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> When I added log statements in the >>>> updateAcrossEvents, I >>>> > > > could see >>>> > > > > > > >>>>>> that these accumulators are getting incremented as >>>> expected. >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. >>>> In Local >>>> > > > mode >>>> > > > > > it >>>> > > > > > > >>>>>> works fine which implies that the Accumulators are >>>> not getting >>>> > > > > > distributed >>>> > > > > > > >>>>>> correctly - or something like that! >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that >>>> tell me to >>>> > > > > > > >>>>>> perform an "Action". That's not a solution here. >>>> This is a >>>> > > > > > 'Stateful >>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also >>>> 'registering' them >>>> > > > in >>>> > > > > > > >>>>>> SparkContext. >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > > >>>>>> >>>> > > > > > >>>> > > > >>>> > >>>> > --------------------------------------------------------------------- >>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> > >>>> >>>