Yes it is application specific class. This is how java Spark Functions work. You can refer to this code in the documentation: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> { @Override public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) { } } On Thu, May 28, 2020 at 10:59 PM Something Something < mailinglist...@gmail.com> wrote: > I am assuming StateUpdateTask is your application specific class. Does it > have 'updateState' method or something? I googled but couldn't find any > documentation about doing it this way. Can you please direct me to some > documentation. Thanks. > > On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote: > >> yes, I am using stateful structured streaming. Yes similar to what you >> do. This is in Java >> I do it this way: >> Dataset<ModelUpdate> productUpdates = watermarkedDS >> .groupByKey( >> (MapFunction<InputEventModel, String>) event -> >> event.getId(), Encoders.STRING()) >> .mapGroupsWithState( >> new >> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >> appConfig, accumulators), >> Encoders.bean(ModelStateInfo.class), >> Encoders.bean(ModelUpdate.class), >> GroupStateTimeout.ProcessingTimeTimeout()); >> >> StateUpdateTask contains the update method. >> >> On Thu, May 28, 2020 at 4:41 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Yes, that's exactly how I am creating them. >>> >>> Question... Are you using 'Stateful Structured Streaming' in which >>> you've something like this? >>> >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> updateAcrossEvents >>> ) >>> >>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>> experiencing this only under 'Stateful Structured Streaming'. In other >>> streaming applications it works as expected. >>> >>> >>> >>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote: >>> >>>> Yes, I am talking about Application specific Accumulators. Actually I >>>> am getting the values printed in my driver log as well as sent to Grafana. >>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a >>>> yarn cluster(not local Mac) where I submit from master node. It should work >>>> the same for cluster mode as well. >>>> Create accumulators like this: >>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>> >>>> >>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> Hmm... how would they go to Graphana if they are not getting computed >>>>> in your code? I am talking about the Application Specific Accumulators. >>>>> The >>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>>> getting populated correctly! >>>>> >>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>> values are coming there. >>>>>> >>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>> mailinglist...@gmail.com> wrote: >>>>>> >>>>>>> No this is not working even if I use LongAccumulator. >>>>>>> >>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>>>>>> >>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>> should be atomic or thread safe. I'm wondering if the implementation >>>>>>>> for >>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>> replace >>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>> LongAccumulator[3] >>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>> >>>>>>>> --- >>>>>>>> Cheers, >>>>>>>> -z >>>>>>>> [1] >>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>> [2] >>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>> [3] >>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>> >>>>>>>> ________________________________________ >>>>>>>> From: Something Something <mailinglist...@gmail.com> >>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>> To: spark-user >>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>> >>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>> this but >>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>> Accumulator >>>>>>>> class that extends from AccumulatorV2. In this class we keep track of >>>>>>>> one >>>>>>>> or more accumulators. Here's the definition: >>>>>>>> >>>>>>>> >>>>>>>> class CollectionLongAccumulator[T] >>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>> >>>>>>>> When the job begins we register an instance of this class: >>>>>>>> >>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>> >>>>>>>> Is this working under Structured Streaming? >>>>>>>> >>>>>>>> I will keep looking for alternate approaches but any help would be >>>>>>>> greatly appreciated. Thanks. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>>>>> >>>>>>>> In my structured streaming job I am updating Spark Accumulators in >>>>>>>> the updateAcrossEvents method but they are always 0 when I try to print >>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>> >>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>> updateAcrossEvents >>>>>>>> ) >>>>>>>> >>>>>>>> >>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>>> StreamingListener which writes values of the accumulators in >>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>>>>>> ZERO! >>>>>>>> >>>>>>>> When I added log statements in the updateAcrossEvents, I could see >>>>>>>> that these accumulators are getting incremented as expected. >>>>>>>> >>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode >>>>>>>> it works fine which implies that the Accumulators are not getting >>>>>>>> distributed correctly - or something like that! >>>>>>>> >>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>> SparkContext. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>