There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic 
or thread safe. I'm wondering if the implementation for `java.util.Map[T, 
Long]` can meet it or not. Is there any chance to replace 
CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and 
test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <mailinglist...@gmail.com>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is 
supported and tested? I've spent a lot of time on this but can't get it to 
work. Just to add more context, we've our own Accumulator class that extends 
from AccumulatorV2. In this class we keep track of one or more accumulators. 
Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly 
appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something 
<mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:

In my structured streaming job I am updating Spark Accumulators in the 
updateAcrossEvents method but they are always 0 when I try to print them in my 
StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a 
StreamingListener which writes values of the accumulators in 'onQueryProgress' 
method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these 
accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine 
which implies that the Accumulators are not getting distributed correctly - or 
something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an 
"Action". That's not a solution here. This is a 'Stateful Structured Streaming' 
job. Yes, I am also 'registering' them in SparkContext.




---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to