Hello,
Even for me it comes as 0 when I print in OnQueryProgress. I use
LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are
coming there.

On Tue, May 26, 2020 at 3:10 AM Something Something <
mailinglist...@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>> atomic or thread safe. I'm wondering if the implementation for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,
>> -z
>> [1]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>> [2]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> ________________________________________
>> From: Something Something <mailinglist...@gmail.com>
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>
>> Can someone from Spark Development team tell me if this functionality is
>> supported and tested? I've spent a lot of time on this but can't get it to
>> work. Just to add more context, we've our own Accumulator class that
>> extends from AccumulatorV2. In this class we keep track of one or more
>> accumulators. Here's the definition:
>>
>>
>> class CollectionLongAccumulator[T]
>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>
>> When the job begins we register an instance of this class:
>>
>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>
>> Is this working under Structured Streaming?
>>
>> I will keep looking for alternate approaches but any help would be
>> greatly appreciated. Thanks.
>>
>>
>>
>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>
>> In my structured streaming job I am updating Spark Accumulators in the
>> updateAcrossEvents method but they are always 0 when I try to print them in
>> my StreamingListener. Here's the code:
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>         updateAcrossEvents
>>       )
>>
>>
>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>> StreamingListener which writes values of the accumulators in
>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>> ZERO!
>>
>> When I added log statements in the updateAcrossEvents, I could see that
>> these accumulators are getting incremented as expected.
>>
>> This only happens when I run in the 'Cluster' mode. In Local mode it
>> works fine which implies that the Accumulators are not getting distributed
>> correctly - or something like that!
>>
>> Note: I've seen quite a few answers on the Web that tell me to perform an
>> "Action". That's not a solution here. This is a 'Stateful Structured
>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>
>>
>>
>>

Reply via email to