It’s in constructor On Sat, May 30, 2020 at 4:15 AM Something Something < mailinglist...@gmail.com> wrote:
> I mean... I don't see any reference to 'accumulator' in your Class > *definition*. How can you access it in the class if it's not in your > definition of class: > > public class StateUpdateTask implements MapGroupsWithStateFunction<*String, > InputEventModel, ModelStateInfo, ModelUpdate*> {. *--> I was expecting > to see 'accumulator' here in the definition.* > > @Override > public ModelUpdate call(String productId, Iterator<InputEventModel> > eventsIterator, GroupState<ModelStateInfo> state) { > } > } > > On Fri, May 29, 2020 at 1:08 PM Srinivas V <srini....@gmail.com> wrote: > >> >> Yes, accumulators are updated in the call method of StateUpdateTask. Like >> when state times out or when the data is pushed to next Kafka topic etc. >> >> On Fri, May 29, 2020 at 11:55 PM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Thanks! I will take a look at the link. Just one question, you seem to >>> be passing 'accumulators' in the constructor but where do you use it in the >>> StateUpdateTask class? I am still missing that connection. Sorry, if my >>> question is dumb. I must be missing something. Thanks for your help so far. >>> It's been useful. >>> >>> >>> On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote: >>> >>>> Yes it is application specific class. This is how java Spark Functions >>>> work. >>>> You can refer to this code in the documentation: >>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java >>>> >>>> public class StateUpdateTask implements >>>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, >>>> ModelUpdate> { >>>> >>>> @Override >>>> public ModelUpdate call(String productId, Iterator<InputEventModel> >>>> eventsIterator, GroupState<ModelStateInfo> state) { >>>> } >>>> } >>>> >>>> On Thu, May 28, 2020 at 10:59 PM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> I am assuming StateUpdateTask is your application specific class. Does >>>>> it have 'updateState' method or something? I googled but couldn't find any >>>>> documentation about doing it this way. Can you please direct me to some >>>>> documentation. Thanks. >>>>> >>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> >>>>> wrote: >>>>> >>>>>> yes, I am using stateful structured streaming. Yes similar to what >>>>>> you do. This is in Java >>>>>> I do it this way: >>>>>> Dataset<ModelUpdate> productUpdates = watermarkedDS >>>>>> .groupByKey( >>>>>> (MapFunction<InputEventModel, String>) event >>>>>> -> event.getId(), Encoders.STRING()) >>>>>> .mapGroupsWithState( >>>>>> new >>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>>> appConfig, accumulators), >>>>>> Encoders.bean(ModelStateInfo.class), >>>>>> Encoders.bean(ModelUpdate.class), >>>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>>> >>>>>> StateUpdateTask contains the update method. >>>>>> >>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>>> mailinglist...@gmail.com> wrote: >>>>>> >>>>>>> Yes, that's exactly how I am creating them. >>>>>>> >>>>>>> Question... Are you using 'Stateful Structured Streaming' in which >>>>>>> you've something like this? >>>>>>> >>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>> updateAcrossEvents >>>>>>> ) >>>>>>> >>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>>>>>> experiencing this only under 'Stateful Structured Streaming'. In other >>>>>>> streaming applications it works as expected. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, I am talking about Application specific Accumulators. Actually >>>>>>>> I am getting the values printed in my driver log as well as sent to >>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is >>>>>>>> “client” >>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It >>>>>>>> should >>>>>>>> work the same for cluster mode as well. >>>>>>>> Create accumulators like this: >>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hmm... how would they go to Graphana if they are not getting >>>>>>>>> computed in your code? I am talking about the Application Specific >>>>>>>>> Accumulators. The other standard counters such as >>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly! >>>>>>>>> >>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on >>>>>>>>>> cluster. >>>>>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>>>>> values are coming there. >>>>>>>>>> >>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the >>>>>>>>>>>> implementation for >>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance >>>>>>>>>>>> to replace >>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>>>>> LongAccumulator[3] >>>>>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>>>>> >>>>>>>>>>>> --- >>>>>>>>>>>> Cheers, >>>>>>>>>>>> -z >>>>>>>>>>>> [1] >>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>>>>> [2] >>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>>>>> [3] >>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>>>>> >>>>>>>>>>>> ________________________________________ >>>>>>>>>>>> From: Something Something <mailinglist...@gmail.com> >>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>>>>> To: spark-user >>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>>>>> >>>>>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>>>>> this but >>>>>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>>>>> Accumulator >>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track >>>>>>>>>>>> of one >>>>>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>>>>> >>>>>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>>>>> >>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>>>>> >>>>>>>>>>>> Is this working under Structured Streaming? >>>>>>>>>>>> >>>>>>>>>>>> I will keep looking for alternate approaches but any help would >>>>>>>>>>>> be greatly appreciated. Thanks. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators >>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try >>>>>>>>>>>> to print >>>>>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>>>>> >>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>>>>> updateAcrossEvents >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've >>>>>>>>>>>> a StreamingListener which writes values of the accumulators in >>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>>>>> ALWAYS >>>>>>>>>>>> ZERO! >>>>>>>>>>>> >>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could >>>>>>>>>>>> see that these accumulators are getting incremented as expected. >>>>>>>>>>>> >>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local >>>>>>>>>>>> mode it works fine which implies that the Accumulators are not >>>>>>>>>>>> getting >>>>>>>>>>>> distributed correctly - or something like that! >>>>>>>>>>>> >>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>>>>> perform an "Action". That's not a solution here. This is a >>>>>>>>>>>> 'Stateful >>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>>>>> SparkContext. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>