yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java I do it this way: Dataset<ModelUpdate> productUpdates = watermarkedDS .groupByKey( (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING()) .mapGroupsWithState( new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators), Encoders.bean(ModelStateInfo.class), Encoders.bean(ModelUpdate.class), GroupStateTimeout.ProcessingTimeTimeout());
StateUpdateTask contains the update method. On Thu, May 28, 2020 at 4:41 AM Something Something < mailinglist...@gmail.com> wrote: > Yes, that's exactly how I am creating them. > > Question... Are you using 'Stateful Structured Streaming' in which you've > something like this? > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing > this only under 'Stateful Structured Streaming'. In other streaming > applications it works as expected. > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote: > >> Yes, I am talking about Application specific Accumulators. Actually I am >> getting the values printed in my driver log as well as sent to Grafana. Not >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn >> cluster(not local Mac) where I submit from master node. It should work the >> same for cluster mode as well. >> Create accumulators like this: >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >> >> >> On Tue, May 26, 2020 at 8:42 PM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Hmm... how would they go to Graphana if they are not getting computed in >>> your code? I am talking about the Application Specific Accumulators. The >>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>> getting populated correctly! >>> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote: >>> >>>> Hello, >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>> But one consolation is that when I send metrics to Graphana, the values >>>> are coming there. >>>> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> No this is not working even if I use LongAccumulator. >>>>> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>>>> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>> replace >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>> LongAccumulator[3] >>>>>> and test if the StreamingListener and other codes are able to work? >>>>>> >>>>>> --- >>>>>> Cheers, >>>>>> -z >>>>>> [1] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>> [2] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>> [3] >>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>> >>>>>> ________________________________________ >>>>>> From: Something Something <mailinglist...@gmail.com> >>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>> To: spark-user >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>> >>>>>> Can someone from Spark Development team tell me if this functionality >>>>>> is supported and tested? I've spent a lot of time on this but can't get >>>>>> it >>>>>> to work. Just to add more context, we've our own Accumulator class that >>>>>> extends from AccumulatorV2. In this class we keep track of one or more >>>>>> accumulators. Here's the definition: >>>>>> >>>>>> >>>>>> class CollectionLongAccumulator[T] >>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>> >>>>>> When the job begins we register an instance of this class: >>>>>> >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>> >>>>>> Is this working under Structured Streaming? >>>>>> >>>>>> I will keep looking for alternate approaches but any help would be >>>>>> greatly appreciated. Thanks. >>>>>> >>>>>> >>>>>> >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>>> >>>>>> In my structured streaming job I am updating Spark Accumulators in >>>>>> the updateAcrossEvents method but they are always 0 when I try to print >>>>>> them in my StreamingListener. Here's the code: >>>>>> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>> updateAcrossEvents >>>>>> ) >>>>>> >>>>>> >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>> StreamingListener which writes values of the accumulators in >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>>>> ZERO! >>>>>> >>>>>> When I added log statements in the updateAcrossEvents, I could see >>>>>> that these accumulators are getting incremented as expected. >>>>>> >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>>>> works fine which implies that the Accumulators are not getting >>>>>> distributed >>>>>> correctly - or something like that! >>>>>> >>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>> SparkContext. >>>>>> >>>>>> >>>>>> >>>>>>