Thanks! I will take a look at the link. Just one question, you seem to be passing 'accumulators' in the constructor but where do you use it in the StateUpdateTask class? I am still missing that connection. Sorry, if my question is dumb. I must be missing something. Thanks for your help so far. It's been useful.
On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote: > Yes it is application specific class. This is how java Spark Functions > work. > You can refer to this code in the documentation: > https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java > > public class StateUpdateTask implements MapGroupsWithStateFunction<String, > InputEventModel, ModelStateInfo, ModelUpdate> { > > @Override > public ModelUpdate call(String productId, Iterator<InputEventModel> > eventsIterator, GroupState<ModelStateInfo> state) { > } > } > > On Thu, May 28, 2020 at 10:59 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I am assuming StateUpdateTask is your application specific class. Does it >> have 'updateState' method or something? I googled but couldn't find any >> documentation about doing it this way. Can you please direct me to some >> documentation. Thanks. >> >> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote: >> >>> yes, I am using stateful structured streaming. Yes similar to what you >>> do. This is in Java >>> I do it this way: >>> Dataset<ModelUpdate> productUpdates = watermarkedDS >>> .groupByKey( >>> (MapFunction<InputEventModel, String>) event -> >>> event.getId(), Encoders.STRING()) >>> .mapGroupsWithState( >>> new >>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>> appConfig, accumulators), >>> Encoders.bean(ModelStateInfo.class), >>> Encoders.bean(ModelUpdate.class), >>> GroupStateTimeout.ProcessingTimeTimeout()); >>> >>> StateUpdateTask contains the update method. >>> >>> On Thu, May 28, 2020 at 4:41 AM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> Yes, that's exactly how I am creating them. >>>> >>>> Question... Are you using 'Stateful Structured Streaming' in which >>>> you've something like this? >>>> >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> updateAcrossEvents >>>> ) >>>> >>>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>>> experiencing this only under 'Stateful Structured Streaming'. In other >>>> streaming applications it works as expected. >>>> >>>> >>>> >>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote: >>>> >>>>> Yes, I am talking about Application specific Accumulators. Actually I >>>>> am getting the values printed in my driver log as well as sent to Grafana. >>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a >>>>> yarn cluster(not local Mac) where I submit from master node. It should >>>>> work >>>>> the same for cluster mode as well. >>>>> Create accumulators like this: >>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>>> >>>>> >>>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> Hmm... how would they go to Graphana if they are not getting computed >>>>>> in your code? I am talking about the Application Specific Accumulators. >>>>>> The >>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>>>> getting populated correctly! >>>>>> >>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>> values are coming there. >>>>>>> >>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> No this is not working even if I use LongAccumulator. >>>>>>>> >>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote: >>>>>>>> >>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type >>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation >>>>>>>>> for >>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>>>> replace >>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>>>> LongAccumulator[3] >>>>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>>>> >>>>>>>>> --- >>>>>>>>> Cheers, >>>>>>>>> -z >>>>>>>>> [1] >>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>>>> [2] >>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>>>> [3] >>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>>>> >>>>>>>>> ________________________________________ >>>>>>>>> From: Something Something <mailinglist...@gmail.com> >>>>>>>>> Sent: Saturday, May 16, 2020 0:38 >>>>>>>>> To: spark-user >>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>>>>>> >>>>>>>>> Can someone from Spark Development team tell me if this >>>>>>>>> functionality is supported and tested? I've spent a lot of time on >>>>>>>>> this but >>>>>>>>> can't get it to work. Just to add more context, we've our own >>>>>>>>> Accumulator >>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of >>>>>>>>> one >>>>>>>>> or more accumulators. Here's the definition: >>>>>>>>> >>>>>>>>> >>>>>>>>> class CollectionLongAccumulator[T] >>>>>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>>>>>> >>>>>>>>> When the job begins we register an instance of this class: >>>>>>>>> >>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>>>>>> >>>>>>>>> Is this working under Structured Streaming? >>>>>>>>> >>>>>>>>> I will keep looking for alternate approaches but any help would be >>>>>>>>> greatly appreciated. Thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>>>>>> >>>>>>>>> In my structured streaming job I am updating Spark Accumulators in >>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to >>>>>>>>> print >>>>>>>>> them in my StreamingListener. Here's the code: >>>>>>>>> >>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>>>>> updateAcrossEvents >>>>>>>>> ) >>>>>>>>> >>>>>>>>> >>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>>>>>> StreamingListener which writes values of the accumulators in >>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are >>>>>>>>> ALWAYS >>>>>>>>> ZERO! >>>>>>>>> >>>>>>>>> When I added log statements in the updateAcrossEvents, I could see >>>>>>>>> that these accumulators are getting incremented as expected. >>>>>>>>> >>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode >>>>>>>>> it works fine which implies that the Accumulators are not getting >>>>>>>>> distributed correctly - or something like that! >>>>>>>>> >>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to >>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful >>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in >>>>>>>>> SparkContext. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>