I mean... I don't see any reference to 'accumulator' in your Class
*definition*. How can you access it in the class if it's not in your
definition of class:

public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting to
see 'accumulator' here in the definition.*

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel>
eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Fri, May 29, 2020 at 1:08 PM Srinivas V <srini....@gmail.com> wrote:

>
> Yes, accumulators are updated in the call method of StateUpdateTask. Like
> when state times out or when the data is pushed to next Kafka topic etc.
>
> On Fri, May 29, 2020 at 11:55 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Thanks! I will take a look at the link. Just one question, you seem to be
>> passing 'accumulators' in the constructor but where do you use it in the
>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>> question is dumb. I must be missing something. Thanks for your help so far.
>> It's been useful.
>>
>>
>> On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote:
>>
>>> Yes it is application specific class. This is how java Spark Functions
>>> work.
>>> You can refer to this code in the documentation:
>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>
>>> public class StateUpdateTask implements
>>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo,
>>> ModelUpdate> {
>>>
>>>     @Override
>>>     public ModelUpdate call(String productId, Iterator<InputEventModel>
>>> eventsIterator, GroupState<ModelStateInfo> state) {
>>>     }
>>> }
>>>
>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> I am assuming StateUpdateTask is your application specific class. Does
>>>> it have 'updateState' method or something? I googled but couldn't find any
>>>> documentation about doing it this way. Can you please direct me to some
>>>> documentation. Thanks.
>>>>
>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote:
>>>>
>>>>> yes, I am using stateful structured streaming. Yes similar to what you
>>>>> do. This is in Java
>>>>> I do it this way:
>>>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>>                 .groupByKey(
>>>>>                         (MapFunction<InputEventModel, String>) event
>>>>> -> event.getId(), Encoders.STRING())
>>>>>                 .mapGroupsWithState(
>>>>>                         new
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> appConfig, accumulators),
>>>>>                         Encoders.bean(ModelStateInfo.class),
>>>>>                         Encoders.bean(ModelUpdate.class),
>>>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> StateUpdateTask contains the update method.
>>>>>
>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> Yes, that's exactly how I am creating them.
>>>>>>
>>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>>> you've something like this?
>>>>>>
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>         updateAcrossEvents
>>>>>>       )
>>>>>>
>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>>>> streaming applications it works as expected.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, I am talking about Application specific Accumulators. Actually
>>>>>>> I am getting the values printed in my driver log as well as sent to
>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is 
>>>>>>> “client”
>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It 
>>>>>>> should
>>>>>>> work the same for cluster mode as well.
>>>>>>> Create accumulators like this:
>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>>
>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on 
>>>>>>>>> cluster.
>>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>>> values are coming there.
>>>>>>>>>
>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the 
>>>>>>>>>>> implementation for
>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>>>>> replace
>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>>>> LongAccumulator[3]
>>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> Cheers,
>>>>>>>>>>> -z
>>>>>>>>>>> [1]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>>> [2]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>>> [3]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>>
>>>>>>>>>>> ________________________________________
>>>>>>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>>> To: spark-user
>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>>
>>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>>>> this but
>>>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>>>> Accumulator
>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track 
>>>>>>>>>>> of one
>>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>>
>>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>>
>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>>
>>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>>
>>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try 
>>>>>>>>>>> to print
>>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>>
>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>>         updateAcrossEvents
>>>>>>>>>>>       )
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>>>> ALWAYS
>>>>>>>>>>> ZERO!
>>>>>>>>>>>
>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>>
>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local
>>>>>>>>>>> mode it works fine which implies that the Accumulators are not 
>>>>>>>>>>> getting
>>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>>
>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>>> SparkContext.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to