Yes, accumulators are updated in the call method of StateUpdateTask. Like
when state times out or when the data is pushed to next Kafka topic etc.

On Fri, May 29, 2020 at 11:55 PM Something Something <
mailinglist...@gmail.com> wrote:

> Thanks! I will take a look at the link. Just one question, you seem to be
> passing 'accumulators' in the constructor but where do you use it in the
> StateUpdateTask class? I am still missing that connection. Sorry, if my
> question is dumb. I must be missing something. Thanks for your help so far.
> It's been useful.
>
>
> On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote:
>
>> Yes it is application specific class. This is how java Spark Functions
>> work.
>> You can refer to this code in the documentation:
>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>
>> public class StateUpdateTask implements
>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo,
>> ModelUpdate> {
>>
>>     @Override
>>     public ModelUpdate call(String productId, Iterator<InputEventModel>
>> eventsIterator, GroupState<ModelStateInfo> state) {
>>     }
>> }
>>
>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> I am assuming StateUpdateTask is your application specific class. Does
>>> it have 'updateState' method or something? I googled but couldn't find any
>>> documentation about doing it this way. Can you please direct me to some
>>> documentation. Thanks.
>>>
>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote:
>>>
>>>> yes, I am using stateful structured streaming. Yes similar to what you
>>>> do. This is in Java
>>>> I do it this way:
>>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>                 .groupByKey(
>>>>                         (MapFunction<InputEventModel, String>) event ->
>>>> event.getId(), Encoders.STRING())
>>>>                 .mapGroupsWithState(
>>>>                         new
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> appConfig, accumulators),
>>>>                         Encoders.bean(ModelStateInfo.class),
>>>>                         Encoders.bean(ModelUpdate.class),
>>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> StateUpdateTask contains the update method.
>>>>
>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Yes, that's exactly how I am creating them.
>>>>>
>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>> you've something like this?
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>         updateAcrossEvents
>>>>>       )
>>>>>
>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>>> streaming applications it works as expected.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>>>> am getting the values printed in my driver log as well as sent to 
>>>>>> Grafana.
>>>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>>>> yarn cluster(not local Mac) where I submit from master node. It should 
>>>>>> work
>>>>>> the same for cluster mode as well.
>>>>>> Create accumulators like this:
>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>
>>>>>>
>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>> Accumulators. The other standard counters such as
>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>
>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>> values are coming there.
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>>>>>>
>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>>>> for
>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>>>> replace
>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>>> LongAccumulator[3]
>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>> Cheers,
>>>>>>>>>> -z
>>>>>>>>>> [1]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>> [2]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>> [3]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>
>>>>>>>>>> ________________________________________
>>>>>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>> To: spark-user
>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>
>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>>> this but
>>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>>> Accumulator
>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track 
>>>>>>>>>> of one
>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>
>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>
>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>
>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>
>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>>>>>
>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try to 
>>>>>>>>>> print
>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>
>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>         updateAcrossEvents
>>>>>>>>>>       )
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>>> ALWAYS
>>>>>>>>>> ZERO!
>>>>>>>>>>
>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>
>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>
>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>> SparkContext.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Reply via email to