Yes it is application specific class. This is how java Spark Functions work.
You can refer to this code in the documentation:
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java

public class StateUpdateTask implements MapGroupsWithStateFunction<String,
InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel>
eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <
mailinglist...@gmail.com> wrote:

> I am assuming StateUpdateTask is your application specific class. Does it
> have 'updateState' method or something? I googled but couldn't find any
> documentation about doing it this way. Can you please direct me to some
> documentation. Thanks.
>
> On Thu, May 28, 2020 at 4:43 AM Srinivas V <srini....@gmail.com> wrote:
>
>> yes, I am using stateful structured streaming. Yes similar to what you
>> do. This is in Java
>> I do it this way:
>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>                 .groupByKey(
>>                         (MapFunction<InputEventModel, String>) event ->
>> event.getId(), Encoders.STRING())
>>                 .mapGroupsWithState(
>>                         new
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> appConfig, accumulators),
>>                         Encoders.bean(ModelStateInfo.class),
>>                         Encoders.bean(ModelUpdate.class),
>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>
>> StateUpdateTask contains the update method.
>>
>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Yes, that's exactly how I am creating them.
>>>
>>> Question... Are you using 'Stateful Structured Streaming' in which
>>> you've something like this?
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>         updateAcrossEvents
>>>       )
>>>
>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>> streaming applications it works as expected.
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini....@gmail.com> wrote:
>>>
>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>> yarn cluster(not local Mac) where I submit from master node. It should work
>>>> the same for cluster mode as well.
>>>> Create accumulators like this:
>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>
>>>>
>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>> in your code? I am talking about the Application Specific Accumulators. 
>>>>> The
>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>> getting populated correctly!
>>>>>
>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>> values are coming there.
>>>>>>
>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>
>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zwb...@msn.com> wrote:
>>>>>>>
>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>> for
>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>> replace
>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>>> LongAccumulator[3]
>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>
>>>>>>>> ---
>>>>>>>> Cheers,
>>>>>>>> -z
>>>>>>>> [1]
>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>> [2]
>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>> [3]
>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>
>>>>>>>> ________________________________________
>>>>>>>> From: Something Something <mailinglist...@gmail.com>
>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>> To: spark-user
>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>
>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>> functionality is supported and tested? I've spent a lot of time on 
>>>>>>>> this but
>>>>>>>> can't get it to work. Just to add more context, we've our own 
>>>>>>>> Accumulator
>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of 
>>>>>>>> one
>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>
>>>>>>>>
>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>
>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>
>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>
>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>
>>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>>> greatly appreciated. Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>>>>
>>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>
>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>         updateAcrossEvents
>>>>>>>>       )
>>>>>>>>
>>>>>>>>
>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>>> ZERO!
>>>>>>>>
>>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>>
>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>> distributed correctly - or something like that!
>>>>>>>>
>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>> SparkContext.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to