Yes, that's exactly how I am creating them.

Question... Are you using 'Stateful Structured Streaming' in which you've
something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

And updating the Accumulator inside 'updateAcrossEvents'? We're
experiencing this only under 'Stateful Structured Streaming'. In other
streaming applications it works as expected.



On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote:

> Yes, I am talking about Application specific Accumulators. Actually I am
> getting the values printed in my driver log as well as sent to Grafana. Not
> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> cluster(not local Mac) where I submit from master node. It should work the
> same for cluster mode as well.
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your code? I am talking about the Application Specific Accumulators. The
>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>> getting populated correctly!
>>
>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com> wrote:
>>
>>> Hello,
>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>> But one consolation is that when I send metrics to Graphana, the values
>>> are coming there.
>>>
>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> No this is not working even if I use LongAccumulator.
>>>>
>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>
>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>> replace
>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>> LongAccumulator[3]
>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>
>>>>> ---
>>>>> Cheers,
>>>>> -z
>>>>> [1]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>> [2]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>> [3]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>
>>>>> ________________________________________
>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>> To: spark-user
>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>
>>>>> Can someone from Spark Development team tell me if this functionality
>>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>> accumulators. Here's the definition:
>>>>>
>>>>>
>>>>> class CollectionLongAccumulator[T]
>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>
>>>>> When the job begins we register an instance of this class:
>>>>>
>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>
>>>>> Is this working under Structured Streaming?
>>>>>
>>>>> I will keep looking for alternate approaches but any help would be
>>>>> greatly appreciated. Thanks.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>
>>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>>> updateAcrossEvents method but they are always 0 when I try to print them 
>>>>> in
>>>>> my StreamingListener. Here's the code:
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>         updateAcrossEvents
>>>>>       )
>>>>>
>>>>>
>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>> StreamingListener which writes values of the accumulators in
>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>> ZERO!
>>>>>
>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>> that these accumulators are getting incremented as expected.
>>>>>
>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>> works fine which implies that the Accumulators are not getting distributed
>>>>> correctly - or something like that!
>>>>>
>>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to