No this is not working even if I use LongAccumulator. On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
> There is a restriction in AccumulatorV2 API [1], the OUT type should be > atomic or thread safe. I'm wondering if the implementation for > `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace > CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] > and test if the StreamingListener and other codes are able to work? > > --- > Cheers, > -z > [1] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 > [2] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator > [3] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator > > ________________________________________ > From: Something Something <mailinglist...@gmail.com> > Sent: Saturday, May 16, 2020 0:38 > To: spark-user > Subject: Re: Using Spark Accumulators with Structured Streaming > > Can someone from Spark Development team tell me if this functionality is > supported and tested? I've spent a lot of time on this but can't get it to > work. Just to add more context, we've our own Accumulator class that > extends from AccumulatorV2. In this class we keep track of one or more > accumulators. Here's the definition: > > > class CollectionLongAccumulator[T] > extends AccumulatorV2[T, java.util.Map[T, Long]] > > When the job begins we register an instance of this class: > > spark.sparkContext.register(myAccumulator, "MyAccumulator") > > Is this working under Structured Streaming? > > I will keep looking for alternate approaches but any help would be greatly > appreciated. Thanks. > > > > On Thu, May 14, 2020 at 2:36 PM Something Something < > mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: > > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but they are always 0 when I try to print them in > my StreamingListener. Here's the code: > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > > The accumulators get incremented in 'updateAcrossEvents'. I've a > StreamingListener which writes values of the accumulators in > 'onQueryProgress' method but in this method the Accumulators are ALWAYS > ZERO! > > When I added log statements in the updateAcrossEvents, I could see that > these accumulators are getting incremented as expected. > > This only happens when I run in the 'Cluster' mode. In Local mode it works > fine which implies that the Accumulators are not getting distributed > correctly - or something like that! > > Note: I've seen quite a few answers on the Web that tell me to perform an > "Action". That's not a solution here. This is a 'Stateful Structured > Streaming' job. Yes, I am also 'registering' them in SparkContext. > > > >