yes, I am using stateful structured streaming. Yes similar to what you do.
This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event ->
event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new
StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <
mailinglist...@gmail.com> wrote:

> Yes, that's exactly how I am creating them.
>
> Question... Are you using 'Stateful Structured Streaming' in which you've
> something like this?
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>         updateAcrossEvents
>       )
>
> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
> this only under 'Stateful Structured Streaming'. In other streaming 
> applications it works as expected.
>
>
>
> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote:
>
>> Yes, I am talking about Application specific Accumulators. Actually I am
>> getting the values printed in my driver log as well as sent to Grafana. Not
>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>> cluster(not local Mac) where I submit from master node. It should work the
>> same for cluster mode as well.
>> Create accumulators like this:
>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>
>>
>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Hmm... how would they go to Graphana if they are not getting computed in
>>> your code? I am talking about the Application Specific Accumulators. The
>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>> getting populated correctly!
>>>
>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote:
>>>
>>>> Hello,
>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>> But one consolation is that when I send metrics to Graphana, the values
>>>> are coming there.
>>>>
>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> No this is not working even if I use LongAccumulator.
>>>>>
>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>>
>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>> replace
>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>> LongAccumulator[3]
>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>
>>>>>> ---
>>>>>> Cheers,
>>>>>> -z
>>>>>> [1]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>> [2]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>> [3]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>
>>>>>> ________________________________________
>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>> To: spark-user
>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>
>>>>>> Can someone from Spark Development team tell me if this functionality
>>>>>> is supported and tested? I've spent a lot of time on this but can't get 
>>>>>> it
>>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>>> accumulators. Here's the definition:
>>>>>>
>>>>>>
>>>>>> class CollectionLongAccumulator[T]
>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>
>>>>>> When the job begins we register an instance of this class:
>>>>>>
>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>
>>>>>> Is this working under Structured Streaming?
>>>>>>
>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>> greatly appreciated. Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>
>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>> them in my StreamingListener. Here's the code:
>>>>>>
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>         updateAcrossEvents
>>>>>>       )
>>>>>>
>>>>>>
>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>> StreamingListener which writes values of the accumulators in
>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>> ZERO!
>>>>>>
>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>> that these accumulators are getting incremented as expected.
>>>>>>
>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>>> works fine which implies that the Accumulators are not getting 
>>>>>> distributed
>>>>>> correctly - or something like that!
>>>>>>
>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>> SparkContext.
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to