It’s in constructor

On Sat, May 30, 2020 at 4:15 AM Something Something <
mailinglist...@gmail.com> wrote:

> I mean... I don't see any reference to 'accumulator' in your Class
> *definition*. How can you access it in the class if it's not in your
> definition of class:
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
> InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting
> to see 'accumulator' here in the definition.*
>
>     @Override
>     public ModelUpdate call(String productId, Iterator<InputEventModel>
> eventsIterator, GroupState<ModelStateInfo> state) {
>     }
> }
>
> On Fri, May 29, 2020 at 1:08 PM Srinivas V <srini....@gmail.com> wrote:
>
>>
>> Yes, accumulators are updated in the call method of StateUpdateTask. Like
>> when state times out or when the data is pushed to next Kafka topic etc.
>>
>> On Fri, May 29, 2020 at 11:55 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Thanks! I will take a look at the link. Just one question, you seem to
>>> be passing 'accumulators' in the constructor but where do you use it in the
>>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>>> question is dumb. I must be missing something. Thanks for your help so far.
>>> It's been useful.
>>>
>>>
>>> On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote:
>>>
>>>> Yes it is application specific class. This is how java Spark Functions
>>>> work.
>>>> You can refer to this code in the documentation:
>>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>>
>>>> public class StateUpdateTask implements
>>>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo,
>>>> ModelUpdate> {
>>>>
>>>>     @Override
>>>>     public ModelUpdate call(String productId, Iterator<InputEventModel>
>>>> eventsIterator, GroupState<ModelStateInfo> state) {
>>>>     }
>>>> }
>>>>
>>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> I am assuming StateUpdateTask is your application specific class. Does
>>>>> it have 'updateState' method or something? I googled but couldn't find any
>>>>> documentation about doing it this way. Can you please direct me to some
>>>>> documentation. Thanks.
>>>>>
>>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> yes, I am using stateful structured streaming. Yes similar to what
>>>>>> you do. This is in Java
>>>>>> I do it this way:
>>>>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>>>                 .groupByKey(
>>>>>>                         (MapFunction<InputEventModel, String>) event
>>>>>> -> event.getId(), Encoders.STRING())
>>>>>>                 .mapGroupsWithState(
>>>>>>                         new
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> appConfig, accumulators),
>>>>>>                         Encoders.bean(ModelStateInfo.class),
>>>>>>                         Encoders.bean(ModelUpdate.class),
>>>>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>
>>>>>> StateUpdateTask contains the update method.
>>>>>>
>>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, that's exactly how I am creating them.
>>>>>>>
>>>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>>>> you've something like this?
>>>>>>>
>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>         updateAcrossEvents
>>>>>>>       )
>>>>>>>
>>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>>>>> streaming applications it works as expected.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, I am talking about Application specific Accumulators. Actually
>>>>>>>> I am getting the values printed in my driver log as well as sent to
>>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is 
>>>>>>>> “client”
>>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It 
>>>>>>>> should
>>>>>>>> work the same for cluster mode as well.
>>>>>>>> Create accumulators like this:
>>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>>>
>>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on 
>>>>>>>>>> cluster.
>>>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>>>> values are coming there.
>>>>>>>>>>
>>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the 
>>>>>>>>>>>> implementation for
>>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance 
>>>>>>>>>>>> to replace
>>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>>>>> LongAccumulator[3]
>>>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>>>
>>>>>>>>>>>> ---
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> -z
>>>>>>>>>>>> [1]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>>>> [2]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>>>> [3]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>>>
>>>>>>>>>>>> ________________________________________
>>>>>>>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>>>> To: spark-user
>>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>>>
>>>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>>>>> this but
>>>>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>>>>> Accumulator
>>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track 
>>>>>>>>>>>> of one
>>>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>>>
>>>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>>>
>>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>>>
>>>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>>>
>>>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try 
>>>>>>>>>>>> to print
>>>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>>>
>>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>>>         updateAcrossEvents
>>>>>>>>>>>>       )
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've
>>>>>>>>>>>> a StreamingListener which writes values of the accumulators in
>>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>>>>> ALWAYS
>>>>>>>>>>>> ZERO!
>>>>>>>>>>>>
>>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>>>
>>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local
>>>>>>>>>>>> mode it works fine which implies that the Accumulators are not 
>>>>>>>>>>>> getting
>>>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>>>
>>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>>>> perform an "Action". That's not a solution here. This is a 
>>>>>>>>>>>> 'Stateful
>>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>>>> SparkContext.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to