I mean... I don't see any reference to 'accumulator' in your Class *definition*. How can you access it in the class if it's not in your definition of class:
public class StateUpdateTask implements MapGroupsWithStateFunction<*String, InputEventModel, ModelStateInfo, ModelUpdate*> {. *--> I was expecting to see 'accumulator' here in the definition.* @Override public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) { } } On Fri, May 29, 2020 at 1:08 PM Srinivas V <srini....@gmail.com> wrote: > > Yes, accumulators are updated in the call method of StateUpdateTask. Like > when state times out or when the data is pushed to next Kafka topic etc. > > On Fri, May 29, 2020 at 11:55 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Thanks! I will take a look at the link. Just one question, you seem to be >> passing 'accumulators' in the constructor but where do you use it in the >> StateUpdateTask class? I am still missing that connection. Sorry, if my >> question is dumb. I must be missing something. Thanks for your help so far. >> It's been useful. >> >> >> On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote: >> >>> Yes it is application specific class. This is how java Spark Functions >>> work. >>> You can refer to this code in the documentation: >>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java >>> >>> public class StateUpdateTask implements >>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, >>> ModelUpdate> { >>> >>> @Override >>> public ModelUpdate call(String productId, Iterator<InputEventModel> >>> eventsIterator, GroupState<ModelStateInfo> state) { >>> } >>> } >>> >>> On Thu, May 28, 2020 at 10:59 PM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> I am assuming StateUpdateTask is your application specific class. Does >>>> it have 'updateState' method or something? I googled but couldn't find any >>>> documentation about doing it this way. Can you please direct me to some >>>> documentation. Thanks. >>>> >>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote: >>>> >>>>> yes, I am using stateful structured streaming. Yes similar to what you >>>>> do. This is in Java >>>>> I do it this way: >>>>> Dataset<ModelUpdate> productUpdates = watermarkedDS >>>>> .groupByKey( >>>>> (MapFunction<InputEventModel, String>) event >>>>> -> event.getId(), Encoders.STRING()) >>>>> .mapGroupsWithState( >>>>> new >>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>> appConfig, accumulators), >>>>> Encoders.bean(ModelStateInfo.class), >>>>> Encoders.bean(ModelUpdate.class), >>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>> >>>>> StateUpdateTask contains the update method. >>>>> >>>>> On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> Yes, that's exactly how I am creating them. >>>>>> >>>>>> Question... Are you using 'Stateful Structured Streaming' in which >>>>>> you've something like this? >>>>>> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>> updateAcrossEvents >>>>>> ) >>>>>> >>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>>>>> experiencing this only under 'Stateful Structured Streaming'. In other >>>>>> streaming applications it works as expected. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Yes, I am talking about Application specific Accumulators. Actually >>>>>>> I am getting the values printed in my driver log as well as sent to >>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is >>>>>>> “client” >>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It >>>>>>> should >>>>>>> work the same for cluster mode as well. >>>>>>> Create accumulators like this: >>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>>>>> >>>>>>> >>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> Hmm... how would they go to Graphana if they are not getting >>>>>>>> computed in your code? I am talking about the Application Specific >>>>>>>> Accumulators. The other standard counters such as >>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly! >>>>>>>> >>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on >>>>>>>>> cluster. >>>>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>>>> values are coming there. >>>>>>>>> >>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>>>>>>>>> >>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>>>> should be atomic or thread safe. I'm wondering if the >>>>>>>>>>> implementation for >>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>>>>> replace >>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>>>> LongAccumulator[3] >>>>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>>>> >>>>>>>>>>> --- >>>>>>>>>>> Cheers, >>>>>>>>>>> -z >>>>>>>>>>> [1] >>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>>>> [2] >>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>>>> [3] >>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>>>> >>>>>>>>>>> ________________________________________ >>>>>>>>>>> From: Something Something <mailinglist...@gmail.com> >>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>>>> To: spark-user >>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>>>> >>>>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>>>> this but >>>>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>>>> Accumulator >>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track >>>>>>>>>>> of one >>>>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>>>> >>>>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>>>> >>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>>>> >>>>>>>>>>> Is this working under Structured Streaming? >>>>>>>>>>> >>>>>>>>>>> I will keep looking for alternate approaches but any help would >>>>>>>>>>> be greatly appreciated. Thanks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators >>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try >>>>>>>>>>> to print >>>>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>>>> >>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>>>> updateAcrossEvents >>>>>>>>>>> ) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>>>>>> StreamingListener which writes values of the accumulators in >>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>>>> ALWAYS >>>>>>>>>>> ZERO! >>>>>>>>>>> >>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could >>>>>>>>>>> see that these accumulators are getting incremented as expected. >>>>>>>>>>> >>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local >>>>>>>>>>> mode it works fine which implies that the Accumulators are not >>>>>>>>>>> getting >>>>>>>>>>> distributed correctly - or something like that! >>>>>>>>>>> >>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>>>> SparkContext. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>