I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.
On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote: > yes, I am using stateful structured streaming. Yes similar to what you do. > This is in Java > I do it this way: > Dataset<ModelUpdate> productUpdates = watermarkedDS > .groupByKey( > (MapFunction<InputEventModel, String>) event -> > event.getId(), Encoders.STRING()) > .mapGroupsWithState( > new > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > appConfig, accumulators), > Encoders.bean(ModelStateInfo.class), > Encoders.bean(ModelUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > StateUpdateTask contains the update method. > > On Thu, May 28, 2020 at 4:41 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Yes, that's exactly how I am creating them. >> >> Question... Are you using 'Stateful Structured Streaming' in which you've >> something like this? >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> updateAcrossEvents >> ) >> >> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing >> this only under 'Stateful Structured Streaming'. In other streaming >> applications it works as expected. >> >> >> >> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote: >> >>> Yes, I am talking about Application specific Accumulators. Actually I am >>> getting the values printed in my driver log as well as sent to Grafana. Not >>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn >>> cluster(not local Mac) where I submit from master node. It should work the >>> same for cluster mode as well. >>> Create accumulators like this: >>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>> >>> >>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> Hmm... how would they go to Graphana if they are not getting computed >>>> in your code? I am talking about the Application Specific Accumulators. The >>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>> getting populated correctly! >>>> >>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote: >>>> >>>>> Hello, >>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>> But one consolation is that when I send metrics to Graphana, the >>>>> values are coming there. >>>>> >>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> No this is not working even if I use LongAccumulator. >>>>>> >>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>>>>> >>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>> replace >>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>> LongAccumulator[3] >>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>> >>>>>>> --- >>>>>>> Cheers, >>>>>>> -z >>>>>>> [1] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>> [2] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>> [3] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>> >>>>>>> ________________________________________ >>>>>>> From: Something Something <mailinglist...@gmail.com> >>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>> To: spark-user >>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>> >>>>>>> Can someone from Spark Development team tell me if this >>>>>>> functionality is supported and tested? I've spent a lot of time on this >>>>>>> but >>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>> Accumulator >>>>>>> class that extends from AccumulatorV2. In this class we keep track of >>>>>>> one >>>>>>> or more accumulators. Here's the definition: >>>>>>> >>>>>>> >>>>>>> class CollectionLongAccumulator[T] >>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>> >>>>>>> When the job begins we register an instance of this class: >>>>>>> >>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>> >>>>>>> Is this working under Structured Streaming? >>>>>>> >>>>>>> I will keep looking for alternate approaches but any help would be >>>>>>> greatly appreciated. Thanks. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>>>> >>>>>>> In my structured streaming job I am updating Spark Accumulators in >>>>>>> the updateAcrossEvents method but they are always 0 when I try to print >>>>>>> them in my StreamingListener. Here's the code: >>>>>>> >>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>> updateAcrossEvents >>>>>>> ) >>>>>>> >>>>>>> >>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>> StreamingListener which writes values of the accumulators in >>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>>>>> ZERO! >>>>>>> >>>>>>> When I added log statements in the updateAcrossEvents, I could see >>>>>>> that these accumulators are getting incremented as expected. >>>>>>> >>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>>>>> works fine which implies that the Accumulators are not getting >>>>>>> distributed >>>>>>> correctly - or something like that! >>>>>>> >>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>> SparkContext. >>>>>>> >>>>>>> >>>>>>> >>>>>>>