No this is not working even if I use LongAccumulator.

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type should be
> atomic or thread safe. I'm wondering if the implementation for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> ________________________________________
> From: Something Something <mailinglist...@gmail.com>
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality is
> supported and tested? I've spent a lot of time on this but can't get it to
> work. Just to add more context, we've our own Accumulator class that
> extends from AccumulatorV2. In this class we keep track of one or more
> accumulators. Here's the definition:
>
>
> class CollectionLongAccumulator[T]
>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>
> When the job begins we register an instance of this class:
>
> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>
> Is this working under Structured Streaming?
>
> I will keep looking for alternate approaches but any help would be greatly
> appreciated. Thanks.
>
>
>
> On Thu, May 14, 2020 at 2:36 PM Something Something <
> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>         updateAcrossEvents
>       )
>
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>

Reply via email to