Thanks! I will take a look at the link. Just one question, you seem to be
passing 'accumulators' in the constructor but where do you use it in the
StateUpdateTask class? I am still missing that connection. Sorry, if my
question is dumb. I must be missing something. Thanks for your help so far.
It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V <srini....@gmail.com> wrote:

> Yes it is application specific class. This is how java Spark Functions
> work.
> You can refer to this code in the documentation:
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<String,
> InputEventModel, ModelStateInfo, ModelUpdate> {
>
>     @Override
>     public ModelUpdate call(String productId, Iterator<InputEventModel>
> eventsIterator, GroupState<ModelStateInfo> state) {
>     }
> }
>
> On Thu, May 28, 2020 at 10:59 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am assuming StateUpdateTask is your application specific class. Does it
>> have 'updateState' method or something? I googled but couldn't find any
>> documentation about doing it this way. Can you please direct me to some
>> documentation. Thanks.
>>
>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote:
>>
>>> yes, I am using stateful structured streaming. Yes similar to what you
>>> do. This is in Java
>>> I do it this way:
>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>                 .groupByKey(
>>>                         (MapFunction<InputEventModel, String>) event ->
>>> event.getId(), Encoders.STRING())
>>>                 .mapGroupsWithState(
>>>                         new
>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>> appConfig, accumulators),
>>>                         Encoders.bean(ModelStateInfo.class),
>>>                         Encoders.bean(ModelUpdate.class),
>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> StateUpdateTask contains the update method.
>>>
>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> Yes, that's exactly how I am creating them.
>>>>
>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>> you've something like this?
>>>>
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>         updateAcrossEvents
>>>>       )
>>>>
>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>> streaming applications it works as expected.
>>>>
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote:
>>>>
>>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>>> yarn cluster(not local Mac) where I submit from master node. It should 
>>>>> work
>>>>> the same for cluster mode as well.
>>>>> Create accumulators like this:
>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>
>>>>>
>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>>> in your code? I am talking about the Application Specific Accumulators. 
>>>>>> The
>>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>>> getting populated correctly!
>>>>>>
>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>> values are coming there.
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>>
>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>
>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>>>>>
>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>>> for
>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>>> replace
>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>>> LongAccumulator[3]
>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> Cheers,
>>>>>>>>> -z
>>>>>>>>> [1]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>> [2]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>> [3]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>
>>>>>>>>> ________________________________________
>>>>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>> To: spark-user
>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>
>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>>> this but
>>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>>> Accumulator
>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of 
>>>>>>>>> one
>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>
>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>
>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>
>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>
>>>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>>>> greatly appreciated. Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>>>>
>>>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to 
>>>>>>>>> print
>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>
>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>         updateAcrossEvents
>>>>>>>>>       )
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are 
>>>>>>>>> ALWAYS
>>>>>>>>> ZERO!
>>>>>>>>>
>>>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>>>
>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>
>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>> SparkContext.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Reply via email to