Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Something Something
My Spark Structured Streaming job works fine when I set "startingOffsets"
to "latest". When I simply change it to "earliest" & specify a new "check
point directory", the job doesn't work. The states don't get timed out
after 10 minutes.

While debugging I noticed that my 'state' logic is indeed getting executed
but states just don't time out - as they do when I use "latest". Any reason
why?

Is this a known issue?

*Note*: I've tried this under Spark 2.3 & 2.4


Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
*Honestly, I don't know how to do this in Scala.* I tried something like
this...



*.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(  new
StateUpdater(myAcc))*

StateUpdater is similar to what Zhang has provided but it's NOT
compiling 'cause I need to return a 'Dataset'.


Here's the definition of mapGroupsWithState in Scala:

def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {



On Mon, Jun 8, 2020 at 12:07 PM Srinivas V  wrote:

> Ya, I had asked this question before. No one responded. By the way, what’s
> your actual name “Something something” if you don’t mind me asking?
>
> On Tue, Jun 9, 2020 at 12:27 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> What is scary is this interface is marked as "experimental"
>>
>> @Experimental
>> @InterfaceStability.Evolving
>> public interface MapGroupsWithStateFunction extends Serializable 
>> {
>>   R call(K key, Iterator values, GroupState state) throws Exception;
>> }
>>
>>
>>
>>
>> On Mon, Jun 8, 2020 at 11:54 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Right, this is exactly how I've it right now. Problem is in the cluster
>>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
>>> will see what I mean.
>>>
>>> I think how Zhang is using will work. Will try & revert.
>>>
>>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V  wrote:
>>>
>>>>
>>>> You don’t need to have a separate class. I created that as it has lot
>>>> of code and logic in my case.
>>>> For you to quickly test you can use Zhang’s Scala code in this chain.
>>>> Pasting it below for your quick reference:
>>>>
>>>> ```scala
>>>> spark.streams.addListener(new StreamingQueryListener {
>>>>   override def onQueryProgress(event: 
>>>> StreamingQueryListener.QueryProgressEvent):
>>>> Unit = {
>>>> println(event.progress.id + " is on progress")
>>>> println(s"My accu is ${myAcc.value} on query progress")
>>>>   }
>>>> ...
>>>> })
>>>>
>>>> def mappingFunc(key: Long, values: Iterator[String], state:
>>>> GroupState[Long]): ... = {
>>>>   myAcc.add(1)
>>>>   println(s">>> key: $key => state: ${state}")
>>>> ...
>>>> }
>>>>
>>>> val wordCounts = words
>>>>   .groupByKey(v => ...)
>>>>   .mapGroupsWithState(timeoutConf = 
>>>> GroupStateTimeout.ProcessingTimeTimeout)(func
>>>> = mappingFunc)
>>>>
>>>> val query = wordCounts.writeStream
>>>>   .outputMode(OutputMode.Update)
>>>>
>>>>
>>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Great. I guess the trick is to use a separate class such as
>>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>>>> Scala. Will try it out & revert. Thanks for the tips.
>>>>>
>>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:
>>>>>
>>>>>> The following Java codes can work in my cluster environment:
>>>>>> ```
>>>>>> .mapGroupsWithState((MapGroupsWithStateFunction>>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>>> myAcc.add(1);
>>>>>> <...>
>>>>>> state.update(newState);
>>>>>> return new LeadingCharCount(key, newState);
>>>>>> },
>>>>>> Encoders.LONG(),
>>>>>> Encoders.bean(LeadingCharCount.class),
>>>>>> GroupStateTimeout.ProcessingTimeTimeout())
>>>>>> ```
>>>>>>
>>>>>> Also works fine with my `StateUpdateTask`:
>>>>>> ```
>>>>>> .mapGroupsWithState(
>>>>>> new StateUpdateTask(myAcc),
>>>>>> Encoders.LONG(),
>>>>>> Encoders.bean(LeadingCharCount.class),
>>>>>> GroupStateTimeout.Pro

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
What is scary is this interface is marked as "experimental"

@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction extends Serializable {
  R call(K key, Iterator values, GroupState state) throws Exception;
}




On Mon, Jun 8, 2020 at 11:54 AM Something Something <
mailinglist...@gmail.com> wrote:

> Right, this is exactly how I've it right now. Problem is in the cluster
> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
> will see what I mean.
>
> I think how Zhang is using will work. Will try & revert.
>
> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V  wrote:
>
>>
>> You don’t need to have a separate class. I created that as it has lot of
>> code and logic in my case.
>> For you to quickly test you can use Zhang’s Scala code in this chain.
>> Pasting it below for your quick reference:
>>
>> ```scala
>> spark.streams.addListener(new StreamingQueryListener {
>>   override def onQueryProgress(event: 
>> StreamingQueryListener.QueryProgressEvent):
>> Unit = {
>> println(event.progress.id + " is on progress")
>> println(s"My accu is ${myAcc.value} on query progress")
>>   }
>> ...
>> })
>>
>> def mappingFunc(key: Long, values: Iterator[String], state:
>> GroupState[Long]): ... = {
>>   myAcc.add(1)
>>   println(s">>> key: $key => state: ${state}")
>> ...
>> }
>>
>> val wordCounts = words
>>   .groupByKey(v => ...)
>>   .mapGroupsWithState(timeoutConf = 
>> GroupStateTimeout.ProcessingTimeTimeout)(func
>> = mappingFunc)
>>
>> val query = wordCounts.writeStream
>>   .outputMode(OutputMode.Update)
>>
>>
>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Great. I guess the trick is to use a separate class such as
>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>> Scala. Will try it out & revert. Thanks for the tips.
>>>
>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:
>>>
>>>> The following Java codes can work in my cluster environment:
>>>> ```
>>>> .mapGroupsWithState((MapGroupsWithStateFunction>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>> myAcc.add(1);
>>>> <...>
>>>> state.update(newState);
>>>> return new LeadingCharCount(key, newState);
>>>> },
>>>> Encoders.LONG(),
>>>> Encoders.bean(LeadingCharCount.class),
>>>> GroupStateTimeout.ProcessingTimeTimeout())
>>>> ```
>>>>
>>>> Also works fine with my `StateUpdateTask`:
>>>> ```
>>>> .mapGroupsWithState(
>>>> new StateUpdateTask(myAcc),
>>>> Encoders.LONG(),
>>>> Encoders.bean(LeadingCharCount.class),
>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> public class StateUpdateTask
>>>> implements MapGroupsWithStateFunction>>> LeadingCharCount> {
>>>> private LongAccumulator myAccInTask;
>>>>
>>>> public StateUpdateTask(LongAccumulator acc) {
>>>> this.myAccInTask = acc;
>>>> }
>>>>
>>>> @Override
>>>> public LeadingCharCount call(String key, Iterator
>>>> values, GroupState state) throws Exception {
>>>> myAccInTask.add(1);
>>>> <...>
>>>> state.update(newState);
>>>> return new LeadingCharCount(key, newState);
>>>> }
>>>> }
>>>> ```
>>>>
>>>> --
>>>> Cheers,
>>>> -z
>>>>
>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>> ZHANG Wei  wrote:
>>>>
>>>> > Yes, verified on the cluster with 5 executors.
>>>> >
>>>> > --
>>>> > Cheers,
>>>> > -z
>>>> >
>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>> > Something Something  wrote:
>>>> >
>>>> > > Did you try this on the Cluster? Note: This works just fine under
>>>> 

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
Right, this is exactly how I've it right now. Problem is in the cluster
mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
will see what I mean.

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V  wrote:

>
> You don’t need to have a separate class. I created that as it has lot of
> code and logic in my case.
> For you to quickly test you can use Zhang’s Scala code in this chain.
> Pasting it below for your quick reference:
>
> ```scala
> spark.streams.addListener(new StreamingQueryListener {
>   override def onQueryProgress(event: 
> StreamingQueryListener.QueryProgressEvent):
> Unit = {
> println(event.progress.id + " is on progress")
> println(s"My accu is ${myAcc.value} on query progress")
>   }
> ...
> })
>
> def mappingFunc(key: Long, values: Iterator[String], state:
> GroupState[Long]): ... = {
>   myAcc.add(1)
>   println(s">>> key: $key => state: ${state}")
> ...
> }
>
> val wordCounts = words
>   .groupByKey(v => ...)
>   .mapGroupsWithState(timeoutConf = 
> GroupStateTimeout.ProcessingTimeTimeout)(func
> = mappingFunc)
>
> val query = wordCounts.writeStream
>   .outputMode(OutputMode.Update)
>
>
> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Great. I guess the trick is to use a separate class such as
>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>> Scala. Will try it out & revert. Thanks for the tips.
>>
>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:
>>
>>> The following Java codes can work in my cluster environment:
>>> ```
>>> .mapGroupsWithState((MapGroupsWithStateFunction>> Long, LeadingCharCount>) (key, values, state) -> {
>>> myAcc.add(1);
>>> <...>
>>> state.update(newState);
>>> return new LeadingCharCount(key, newState);
>>> },
>>> Encoders.LONG(),
>>> Encoders.bean(LeadingCharCount.class),
>>> GroupStateTimeout.ProcessingTimeTimeout())
>>> ```
>>>
>>> Also works fine with my `StateUpdateTask`:
>>> ```
>>> .mapGroupsWithState(
>>> new StateUpdateTask(myAcc),
>>> Encoders.LONG(),
>>> Encoders.bean(LeadingCharCount.class),
>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> public class StateUpdateTask
>>> implements MapGroupsWithStateFunction>> LeadingCharCount> {
>>> private LongAccumulator myAccInTask;
>>>
>>> public StateUpdateTask(LongAccumulator acc) {
>>> this.myAccInTask = acc;
>>> }
>>>
>>> @Override
>>> public LeadingCharCount call(String key, Iterator
>>> values, GroupState state) throws Exception {
>>> myAccInTask.add(1);
>>> <...>
>>> state.update(newState);
>>> return new LeadingCharCount(key, newState);
>>> }
>>> }
>>> ```
>>>
>>> --
>>> Cheers,
>>> -z
>>>
>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>> ZHANG Wei  wrote:
>>>
>>> > Yes, verified on the cluster with 5 executors.
>>> >
>>> > --
>>> > Cheers,
>>> > -z
>>> >
>>> > On Fri, 29 May 2020 11:16:12 -0700
>>> > Something Something  wrote:
>>> >
>>> > > Did you try this on the Cluster? Note: This works just fine under
>>> 'Local'
>>> > > mode.
>>> > >
>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei 
>>> wrote:
>>> > >
>>> > > > I can't reproduce the issue with my simple code:
>>> > > > ```scala
>>> > > > spark.streams.addListener(new StreamingQueryListener {
>>> > > >   override def onQueryProgress(event:
>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>> > > > println(event.progress.id + " is on progress")
>>> > > > println(s"My accu is ${myAcc.value} on query progress")
>>> > > >   }
>>> > > > 

Re: Using Spark Accumulators with Structured Streaming

2020-06-07 Thread Something Something
Great. I guess the trick is to use a separate class such as
'StateUpdateTask'. I will try that. My challenge is to convert this into
Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:

> The following Java codes can work in my cluster environment:
> ```
> .mapGroupsWithState((MapGroupsWithStateFunction LeadingCharCount>) (key, values, state) -> {
> myAcc.add(1);
> <...>
> state.update(newState);
> return new LeadingCharCount(key, newState);
> },
> Encoders.LONG(),
> Encoders.bean(LeadingCharCount.class),
> GroupStateTimeout.ProcessingTimeTimeout())
> ```
>
> Also works fine with my `StateUpdateTask`:
> ```
> .mapGroupsWithState(
> new StateUpdateTask(myAcc),
> Encoders.LONG(),
> Encoders.bean(LeadingCharCount.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> public class StateUpdateTask
> implements MapGroupsWithStateFunction LeadingCharCount> {
> private LongAccumulator myAccInTask;
>
> public StateUpdateTask(LongAccumulator acc) {
> this.myAccInTask = acc;
> }
>
> @Override
> public LeadingCharCount call(String key, Iterator values,
> GroupState state) throws Exception {
> myAccInTask.add(1);
> <...>
> state.update(newState);
> return new LeadingCharCount(key, newState);
> }
> }
> ```
>
> --
> Cheers,
> -z
>
> On Tue, 2 Jun 2020 10:28:36 +0800
> ZHANG Wei  wrote:
>
> > Yes, verified on the cluster with 5 executors.
> >
> > --
> > Cheers,
> > -z
> >
> > On Fri, 29 May 2020 11:16:12 -0700
> > Something Something  wrote:
> >
> > > Did you try this on the Cluster? Note: This works just fine under
> 'Local'
> > > mode.
> > >
> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei  wrote:
> > >
> > > > I can't reproduce the issue with my simple code:
> > > > ```scala
> > > > spark.streams.addListener(new StreamingQueryListener {
> > > >   override def onQueryProgress(event:
> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > > > println(event.progress.id + " is on progress")
> > > > println(s"My accu is ${myAcc.value} on query progress")
> > > >   }
> > > > ...
> > > > })
> > > >
> > > > def mappingFunc(key: Long, values: Iterator[String], state:
> > > > GroupState[Long]): ... = {
> > > >   myAcc.add(1)
> > > >   println(s">>> key: $key => state: ${state}")
> > > > ...
> > > > }
> > > >
> > > > val wordCounts = words
> > > >   .groupByKey(v => ...)
> > > >   .mapGroupsWithState(timeoutConf =
> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > > >
> > > > val query = wordCounts.writeStream
> > > >   .outputMode(OutputMode.Update)
> > > > ...
> > > > ```
> > > >
> > > > I'm wondering if there were any errors can be found from driver
> logs? The
> > > > micro-batch
> > > > exceptions won't terminate the streaming job running.
> > > >
> > > > For the following code, we have to make sure that `StateUpdateTask`
> is
> > > > started:
> > > > > .mapGroupsWithState(
> > > > > new
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > appConfig, accumulators),
> > > > > Encoders.bean(ModelStateInfo.class),
> > > > > Encoders.bean(ModelUpdate.class),
> > > > > GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > --
> > > > Cheers,
> > > > -z
> > > >
> > > > On Thu, 28 May 2020 19:59:31 +0530
> > > > Srinivas V  wrote:
> > > >
> > > > > Giving the code below:
> > > > > //accumulators is a class level variable in driver.
> > > > >
> > > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
>

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
I mean... I don't see any reference to 'accumulator' in your Class
*definition*. How can you access it in the class if it's not in your
definition of class:

public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting to
see 'accumulator' here in the definition.*

@Override
public ModelUpdate call(String productId, Iterator
eventsIterator, GroupState state) {
}
}

On Fri, May 29, 2020 at 1:08 PM Srinivas V  wrote:

>
> Yes, accumulators are updated in the call method of StateUpdateTask. Like
> when state times out or when the data is pushed to next Kafka topic etc.
>
> On Fri, May 29, 2020 at 11:55 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Thanks! I will take a look at the link. Just one question, you seem to be
>> passing 'accumulators' in the constructor but where do you use it in the
>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>> question is dumb. I must be missing something. Thanks for your help so far.
>> It's been useful.
>>
>>
>> On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:
>>
>>> Yes it is application specific class. This is how java Spark Functions
>>> work.
>>> You can refer to this code in the documentation:
>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>
>>> public class StateUpdateTask implements
>>> MapGroupsWithStateFunction>> ModelUpdate> {
>>>
>>> @Override
>>> public ModelUpdate call(String productId, Iterator
>>> eventsIterator, GroupState state) {
>>> }
>>> }
>>>
>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> I am assuming StateUpdateTask is your application specific class. Does
>>>> it have 'updateState' method or something? I googled but couldn't find any
>>>> documentation about doing it this way. Can you please direct me to some
>>>> documentation. Thanks.
>>>>
>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>>>>
>>>>> yes, I am using stateful structured streaming. Yes similar to what you
>>>>> do. This is in Java
>>>>> I do it this way:
>>>>> Dataset productUpdates = watermarkedDS
>>>>> .groupByKey(
>>>>> (MapFunction) event
>>>>> -> event.getId(), Encoders.STRING())
>>>>> .mapGroupsWithState(
>>>>> new
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> appConfig, accumulators),
>>>>> Encoders.bean(ModelStateInfo.class),
>>>>> Encoders.bean(ModelUpdate.class),
>>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> StateUpdateTask contains the update method.
>>>>>
>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> Yes, that's exactly how I am creating them.
>>>>>>
>>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>>> you've something like this?
>>>>>>
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>> updateAcrossEvents
>>>>>>   )
>>>>>>
>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>>>> streaming applications it works as expected.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V 
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, I am talking about Application specific Accumulators. Actually
>>>>>>> I am getting the values printed in my driver log as well as sent to
>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is 
>>>>>>> “client”
>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It 
&

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
Thanks! I will take a look at the link. Just one question, you seem to be
passing 'accumulators' in the constructor but where do you use it in the
StateUpdateTask class? I am still missing that connection. Sorry, if my
question is dumb. I must be missing something. Thanks for your help so far.
It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:

> Yes it is application specific class. This is how java Spark Functions
> work.
> You can refer to this code in the documentation:
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>
> public class StateUpdateTask implements MapGroupsWithStateFunction InputEventModel, ModelStateInfo, ModelUpdate> {
>
> @Override
> public ModelUpdate call(String productId, Iterator
> eventsIterator, GroupState state) {
> }
> }
>
> On Thu, May 28, 2020 at 10:59 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am assuming StateUpdateTask is your application specific class. Does it
>> have 'updateState' method or something? I googled but couldn't find any
>> documentation about doing it this way. Can you please direct me to some
>> documentation. Thanks.
>>
>> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>>
>>> yes, I am using stateful structured streaming. Yes similar to what you
>>> do. This is in Java
>>> I do it this way:
>>> Dataset productUpdates = watermarkedDS
>>> .groupByKey(
>>> (MapFunction) event ->
>>> event.getId(), Encoders.STRING())
>>> .mapGroupsWithState(
>>> new
>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>> appConfig, accumulators),
>>> Encoders.bean(ModelStateInfo.class),
>>>     Encoders.bean(ModelUpdate.class),
>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> StateUpdateTask contains the update method.
>>>
>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> Yes, that's exactly how I am creating them.
>>>>
>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>> you've something like this?
>>>>
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> updateAcrossEvents
>>>>   )
>>>>
>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>> streaming applications it works as expected.
>>>>
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>>>
>>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>>> yarn cluster(not local Mac) where I submit from master node. It should 
>>>>> work
>>>>> the same for cluster mode as well.
>>>>> Create accumulators like this:
>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>
>>>>>
>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>>> in your code? I am talking about the Application Specific Accumulators. 
>>>>>> The
>>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>>> getting populated correctly!
>>>>>>
>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>> values are coming there.
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
gt; >
> > > And the accumulator value updated can be found from driver stdout.
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 17:12:48 +0530
> > > Srinivas V  wrote:
> > >
> > > > yes, I am using stateful structured streaming. Yes similar to what
> you
> > > do.
> > > > This is in Java
> > > > I do it this way:
> > > > Dataset productUpdates = watermarkedDS
> > > > .groupByKey(
> > > > (MapFunction) event
> ->
> > > > event.getId(), Encoders.STRING())
> > > > .mapGroupsWithState(
> > > > new
> > > >
> > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > > Encoders.bean(ModelStateInfo.class),
> > > > Encoders.bean(ModelUpdate.class),
> > > > GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > StateUpdateTask contains the update method.
> > > >
> > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > mailinglist...@gmail.com> wrote:
> > > >
> > > > > Yes, that's exactly how I am creating them.
> > > > >
> > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > you've
> > > > > something like this?
> > > > >
> > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > updateAcrossEvents
> > > > >   )
> > > > >
> > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > streaming applications it works as expected.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> > > wrote:
> > > > >
> > > > >> Yes, I am talking about Application specific Accumulators.
> Actually I
> > > am
> > > > >> getting the values printed in my driver log as well as sent to
> > > Grafana. Not
> > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> a
> > > yarn
> > > > >> cluster(not local Mac) where I submit from master node. It should
> > > work the
> > > > >> same for cluster mode as well.
> > > > >> Create accumulators like this:
> > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > >>
> > > > >>
> > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > >> mailinglist...@gmail.com> wrote:
> > > > >>
> > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > computed in
> > > > >>> your code? I am talking about the Application Specific
> Accumulators.
> > > The
> > > > >>> other standard counters such as
> 'event.progress.inputRowsPerSecond'
> > > are
> > > > >>> getting populated correctly!
> > > > >>>
> > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> > > wrote:
> > > > >>>
> > > > >>>> Hello,
> > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > cluster.
> > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > values
> > > > >>>> are coming there.
> > > > >>>>
> > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > >>>> mailinglist...@gmail.com> wrote:
> > > > >>>>
> > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > >>>>>
> > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  >
> > > wrote:
> > > > >>>>>
> > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the 

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Something Something
I am assuming StateUpdateTask is your application specific class. Does it
have 'updateState' method or something? I googled but couldn't find any
documentation about doing it this way. Can you please direct me to some
documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
> Dataset productUpdates = watermarkedDS
> .groupByKey(
> (MapFunction) event ->
> event.getId(), Encoders.STRING())
> .mapGroupsWithState(
> new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, that's exactly how I am creating them.
>>
>> Question... Are you using 'Stateful Structured Streaming' in which you've
>> something like this?
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
>> this only under 'Stateful Structured Streaming'. In other streaming 
>> applications it works as expected.
>>
>>
>>
>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>
>>> Yes, I am talking about Application specific Accumulators. Actually I am
>>> getting the values printed in my driver log as well as sent to Grafana. Not
>>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>>> cluster(not local Mac) where I submit from master node. It should work the
>>> same for cluster mode as well.
>>> Create accumulators like this:
>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>> in your code? I am talking about the Application Specific Accumulators. The
>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>> getting populated correctly!
>>>>
>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>>>
>>>>> Hello,
>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>> values are coming there.
>>>>>
>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>> mailinglist...@gmail.com> wrote:
>>>>>
>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>
>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>
>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>> replace
>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>>> LongAccumulator[3]
>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>
>>>>>>> ---
>>>>>>> Cheers,
>>>>>>> -z
>>>>>>> [1]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>> [2]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>> [3]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>
>>>>>>> 
>>>>>>> From: Something Something 
>>>>>>> Sent: Saturday, May 16, 2020 0:38
>&g

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Something Something
Yes, that's exactly how I am creating them.

Question... Are you using 'Stateful Structured Streaming' in which you've
something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )

And updating the Accumulator inside 'updateAcrossEvents'? We're
experiencing this only under 'Stateful Structured Streaming'. In other
streaming applications it works as expected.



On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:

> Yes, I am talking about Application specific Accumulators. Actually I am
> getting the values printed in my driver log as well as sent to Grafana. Not
> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> cluster(not local Mac) where I submit from master node. It should work the
> same for cluster mode as well.
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your code? I am talking about the Application Specific Accumulators. The
>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>> getting populated correctly!
>>
>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>
>>> Hello,
>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>> But one consolation is that when I send metrics to Graphana, the values
>>> are coming there.
>>>
>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> No this is not working even if I use LongAccumulator.
>>>>
>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>
>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>> replace
>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>> LongAccumulator[3]
>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>
>>>>> ---
>>>>> Cheers,
>>>>> -z
>>>>> [1]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>> [2]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>> [3]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>
>>>>> 
>>>>> From: Something Something 
>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>> To: spark-user
>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>
>>>>> Can someone from Spark Development team tell me if this functionality
>>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>> accumulators. Here's the definition:
>>>>>
>>>>>
>>>>> class CollectionLongAccumulator[T]
>>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>
>>>>> When the job begins we register an instance of this class:
>>>>>
>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>
>>>>> Is this working under Structured Streaming?
>>>>>
>>>>> I will keep looking for alternate approaches but any help would be
>>>>> greatly appreciated. Thanks.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>>
>>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>>> updateAcrossEvents method but they are always 0 when I try to print them 
>>>>> in
>>>>> my StreamingListener. Here's the code:
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> updateAcrossEvents
>>>>>   )
>>>>>
>>>>>
>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>> StreamingListener which writes values of the accumulators in
>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>> ZERO!
>>>>>
>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>> that these accumulators are getting incremented as expected.
>>>>>
>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>> works fine which implies that the Accumulators are not getting distributed
>>>>> correctly - or something like that!
>>>>>
>>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>>
>>>>>
>>>>>
>>>>>


Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-26 Thread Something Something
Thanks. Missed that part of documentation. Appreciate your help. Regards.

On Mon, May 25, 2020 at 10:42 PM Jungtaek Lim 
wrote:

> Hi,
>
> You need to add the prefix "kafka." for the configurations which should be
> propagated to the Kafka. Others will be used in Spark data source
> itself. (Kafka connector in this case)
>
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Tue, May 26, 2020 at 6:42 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I keep getting this error message:
>>
>>
>> *The message is 1169350 bytes when serialized which is larger than the
>> maximum request size you have configured with the max.request.size
>> configuration.*
>>
>>
>>
>> As indicated in other posts, I am trying to set the “max.request.size”
>> configuration in the Producer as follows:
>>
>>
>> -
>>
>> .writeStream
>>
>> .format(*"kafka"*)
>>
>> .option(
>>
>>   *"kafka.bootstrap.servers"*,
>>
>>   conig.outputBootstrapServer
>>
>> )
>>
>> .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)
>>
>> -
>>
>>
>>
>> But this is not working. Am I setting this correctly? Is there a
>> different way to set this property under Spark Structured Streaming?
>>
>>
>> Please help. Thanks.
>>
>>
>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-26 Thread Something Something
Hmm... how would they go to Graphana if they are not getting computed in
your code? I am talking about the Application Specific Accumulators. The
other standard counters such as 'event.progress.inputRowsPerSecond' are
getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:

> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the values
> are coming there.
>
> On Tue, May 26, 2020 at 3:10 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> No this is not working even if I use LongAccumulator.
>>
>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>
>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>> atomic or thread safe. I'm wondering if the implementation for
>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>> and test if the StreamingListener and other codes are able to work?
>>>
>>> ---
>>> Cheers,
>>> -z
>>> [1]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>> [2]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>> [3]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>
>>> 
>>> From: Something Something 
>>> Sent: Saturday, May 16, 2020 0:38
>>> To: spark-user
>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>
>>> Can someone from Spark Development team tell me if this functionality is
>>> supported and tested? I've spent a lot of time on this but can't get it to
>>> work. Just to add more context, we've our own Accumulator class that
>>> extends from AccumulatorV2. In this class we keep track of one or more
>>> accumulators. Here's the definition:
>>>
>>>
>>> class CollectionLongAccumulator[T]
>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>
>>> When the job begins we register an instance of this class:
>>>
>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>
>>> Is this working under Structured Streaming?
>>>
>>> I will keep looking for alternate approaches but any help would be
>>> greatly appreciated. Thanks.
>>>
>>>
>>>
>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>
>>> In my structured streaming job I am updating Spark Accumulators in the
>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>> my StreamingListener. Here's the code:
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>>
>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>> StreamingListener which writes values of the accumulators in
>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>> ZERO!
>>>
>>> When I added log statements in the updateAcrossEvents, I could see that
>>> these accumulators are getting incremented as expected.
>>>
>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>> works fine which implies that the Accumulators are not getting distributed
>>> correctly - or something like that!
>>>
>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>
>>>
>>>
>>>


RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Something Something
I keep getting this error message:


*The message is 1169350 bytes when serialized which is larger than the
maximum request size you have configured with the max.request.size
configuration.*



As indicated in other posts, I am trying to set the “max.request.size”
configuration in the Producer as follows:


-

.writeStream

.format(*"kafka"*)

.option(

  *"kafka.bootstrap.servers"*,

  conig.outputBootstrapServer

)

.option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)

-



But this is not working. Am I setting this correctly? Is there a different
way to set this property under Spark Structured Streaming?


Please help. Thanks.


Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
No this is not working even if I use LongAccumulator.

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type should be
> atomic or thread safe. I'm wondering if the implementation for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> ________
> From: Something Something 
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality is
> supported and tested? I've spent a lot of time on this but can't get it to
> work. Just to add more context, we've our own Accumulator class that
> extends from AccumulatorV2. In this class we keep track of one or more
> accumulators. Here's the definition:
>
>
> class CollectionLongAccumulator[T]
> extends AccumulatorV2[T, java.util.Map[T, Long]]
>
> When the job begins we register an instance of this class:
>
> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>
> Is this working under Structured Streaming?
>
> I will keep looking for alternate approaches but any help would be greatly
> appreciated. Thanks.
>
>
>
> On Thu, May 14, 2020 at 2:36 PM Something Something <
> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread Something Something
Can someone from Spark Development team tell me if this functionality is
supported and tested? I've spent a lot of time on this but can't get it
to work. Just to add more context, we've our own Accumulator class that
extends from AccumulatorV2. In this class we keep track of one or more
accumulators. Here's the definition:

class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be
greatly appreciated. Thanks.




On Thu, May 14, 2020 at 2:36 PM Something Something <
mailinglist...@gmail.com> wrote:

> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>


Using Spark Accumulators with Structured Streaming

2020-05-14 Thread Something Something
In my structured streaming job I am updating Spark Accumulators in the
updateAcrossEvents method but they are always 0 when I try to print them in
my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )

The accumulators get incremented in 'updateAcrossEvents'. I've a
StreamingListener which writes values of the accumulators in
'onQueryProgress' method but in this method the Accumulators are ALWAYS
ZERO!

When I added log statements in the updateAcrossEvents, I could see that
these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works
fine which implies that the Accumulators are not getting distributed
correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an
"Action". That's not a solution here. This is a 'Stateful Structured
Streaming' job. Yes, I am also 'registering' them in SparkContext.


Re: No. of active states?

2020-05-07 Thread Something Something
No. We are already capturing these metrics (e.g. numInputRows,
inputRowsPerSecond).

I am talking about "No. of States" in the memory at any given time.

On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim 
wrote:

> If you're referring total "entries" in all states in SS job, it's being
> provided via StreamingQueryListener.
>
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>
> Hope this helps.
>
> On Fri, May 8, 2020 at 3:26 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Is there a way to get the total no. of active states in memory at any
>> given point in a Stateful Spark Structured Streaming job? We are thinking
>> of using this metric for 'Auto Scaling' our Spark cluster.
>>
>


Dynamically changing maxOffsetsPerTrigger

2020-05-07 Thread Something Something
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while
a Stateful Structured Streaming job is running?

We are thinking of auto-scaling our Spark cluster but if we don't modify
the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to
the cluster help? I don't think it would, would it?

In other words, if I add 2 new VMs to the cluster but value of
'maxOffsetsPerTrigger' is still the same would performance improve? I would
think not. We would have to explicitly stop the job, add VMs & then restart
the job after changing the value of 'maxOffsetsPerTrigger' - which defeats
the purpose of Auto-scaling.

Please tell me if my understanding is not correct. Thanks.


No. of active states?

2020-05-07 Thread Something Something
Is there a way to get the total no. of active states in memory at any given
point in a Stateful Spark Structured Streaming job? We are thinking of
using this metric for 'Auto Scaling' our Spark cluster.


Spark not able to read from an Embedded Kafka Topic

2020-03-06 Thread Something Something
I am trying to write an integration test using Embedded Kafka but I keep
getting NullPointerException. My test case is very simple. It has following
steps:

   1. Read a JSON file & write messages to an inputTopic.
   2. Perform a 'readStream' operation.
   3. Do a 'select' on the Stream. This throws a NullPointerException.

What am I doing wrong? Code is given below:


"My Test which runs with Embedded Kafka" should "Generate correct Result" in {

implicit val config: EmbeddedKafkaConfig =
  EmbeddedKafkaConfig(
kafkaPort = 9066,
zooKeeperPort = 2066,
Map("log.dir" -> "./src/test/resources/")
  )

withRunningKafka {
  createCustomTopic(inputTopic)
  val source = Source.fromFile("src/test/resources/test1.json")
  source.getLines.toList.filterNot(_.isEmpty).foreach(
line => publishStringMessageToKafka(inputTopic, line)
  )
  source.close()
  implicit val deserializer: StringDeserializer = new StringDeserializer

  createCustomTopic(outputTopic)
  import spark2.implicits._

  val schema = spark.read.json("my.json").schema
  val myStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9066")
.option("subscribe", inputTopic)
.load()

  // Schema looks good
  myStream.printSchema()

  // Following line throws NULLPointerException! Why?
  val df = myStream.select(from_json($"value".cast("string"),
schema).alias("value"))

  // There's more code... but let's not worry about that for now.
}

  }


Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-05 Thread Something Something
Yes that was it! It seems it only works if input data is continuously
flowing. I had stopped the input job because I had enough data but it seems
timeouts work only if the data is continuously fed. Not sure why it's
designed that way. Makes it a bit harder to write unit/integration tests
BUT I am sure there's a reason why it's designed this way. Thanks.

On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das 
wrote:

> Make sure that you are continuously feeding data into the query to trigger
> the batches. only then timeouts are processed.
> See the timeout behavior details here -
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState
>
> On Wed, Mar 4, 2020 at 2:51 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I've set the timeout duration to "2 minutes" as follows:
>>
>> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: 
>> Iterator[R00tJsonObject],
>>   oldState: GroupState[MyState]): OutputRow = {
>>
>> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + 
>> tuple3._2 + ", " + tuple3._3)
>> var state: MyState = if (oldState.exists) oldState.get else 
>> MyState(tuple3._1, tuple3._2, tuple3._3)
>>
>> if (oldState.hasTimedOut) {
>>   println("@ oldState has timed out ")
>>   // Logic to Write OutputRow
>>   OutputRow("some values here...")
>> } else {
>>   for (input <- inputs) {
>> state = updateWithEvent(state, input)
>> oldState.update(state)
>> *oldState.setTimeoutDuration("2 minutes")*
>>   }
>>   OutputRow(null, null, null)
>> }
>>
>>   }
>>
>> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as 
>> follows...
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>>
>> But 'hasTimedOut' is never true so I don't get any output! What am I doing 
>> wrong?
>>
>>
>>
>>


Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Something Something
I've set the timeout duration to "2 minutes" as follows:

def updateAcrossEvents (tuple3: Tuple3[String, String, String],
inputs: Iterator[R00tJsonObject],
  oldState: GroupState[MyState]): OutputRow = {

println(" Inside updateAcrossEvents with : " + tuple3._1 + ",
" + tuple3._2 + ", " + tuple3._3)
var state: MyState = if (oldState.exists) oldState.get else
MyState(tuple3._1, tuple3._2, tuple3._3)

if (oldState.hasTimedOut) {
  println("@ oldState has timed out ")
  // Logic to Write OutputRow
  OutputRow("some values here...")
} else {
  for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
*oldState.setTimeoutDuration("2 minutes")*
  }
  OutputRow(null, null, null)
}

  }

I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as
follows...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

But 'hasTimedOut' is never true so I don't get any output! What am I
doing wrong?


Re: Stateful Spark Streaming: Required attribute 'value' not found

2020-03-04 Thread Something Something
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe
it will help someone.

On Tue, Mar 3, 2020 at 6:02 PM Something Something 
wrote:

> In a Stateful Spark Streaming application I am writing the 'OutputRow' in
> the 'updateAcrossEvents' but I keep getting this error (*Required
> attribute 'value' not found*) while it's trying to write to Kafka. I know
> from the documentation that 'value' attribute needs to be set but how do I
> do that in the 'Stateful Structured Streaming'? Where & how do I add this
> 'value' attribute in the following code? *Note: I am using Spark 2.3.1*
>
> withEventTime
>   .as[R00tJsonObject]
>   .withWatermark("event_time", "5 minutes")
>   .groupByKey(row => (row.value.Id, row.value.time.toString, 
> row.value.cId))
>   
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
>   .writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "myTopic")
>   .option("checkpointLocation", "/Users/username/checkpointLocation")
>   .outputMode("update")
>   .start()
>   .awaitTermination()
>
>


Stateful Spark Streaming: Required attribute 'value' not found

2020-03-03 Thread Something Something
In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
in the 'Stateful Structured Streaming'? Where & how do I add this 'value'
attribute in the following code? *Note: I am using Spark 2.3.1*

withEventTime
  .as[R00tJsonObject]
  .withWatermark("event_time", "5 minutes")
  .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
  
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "myTopic")
  .option("checkpointLocation", "/Users/username/checkpointLocation")
  .outputMode("update")
  .start()
  .awaitTermination()


Example of Stateful Spark Structured Streaming with Kafka

2020-03-03 Thread Something Something
There are lots of examples on 'Stateful Structured Streaming' in 'The
Definitive Guide' book BUT all of them read JSON from a 'path'. That's
working for me.

Now I need to read from Kafka.

I Googled but I couldn't find any example. I am struggling to Map the
'Value' of the Kafka message to my JSON. Any help would be appreciated.
Here's what I am trying:

val query = withEventTime
  .as[R00tJsonObject]
  .withWatermark("event_time", "5 minutes")
  .groupByKey(row => (row.report.id, row.report.time.toString,
row.report.cId))
  
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", *"myTopic"*)
  .option("checkpointLocation", "/Users/username/checkpointLocation")
  .outputMode("update")
  .start().awaitTermination


cannot resolve 'arrivalTime' given input columns: [value, event_time];


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I changed it to Tuple2 and that problem is solved.

Any thoughts on this message

*Unapplied methods are only converted to functions when a function type is
expected.*

*You can make this conversion explicit by writing `updateAcrossEvents _` or
`updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

On Mon, Mar 2, 2020 at 5:12 PM lec ssmi  wrote:

> maybe you can combine the fields you want to use into one field
>
> Something Something  于2020年3月3日周二 上午6:37写道:
>
>> I am writing a Stateful Streaming application in which I am using
>> mapGroupsWithState to create aggregates for Groups but I need to create 
>> *Groups
>> based on more than one column in the Input Row*. All the examples in the
>> 'Spark: The Definitive Guide' use only one column such as 'User' or
>> 'Device'. I am using code similar to what's given below. *How do I
>> specify more than one field in the 'groupByKey'?*
>>
>> There are other challenges as well. The book says we can use
>> 'updateAcrossEvents' the way given below but I get compile time error
>> saying:
>>
>>
>> *Error:(43, 65) missing argument list for method updateAcrossEvents in
>> object MainUnapplied methods are only converted to functions when a
>> function type is expected.You can make this conversion explicit by writing
>> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
>> `updateAcrossEvents`.
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>>
>> Another challenge: Compiler also complains about the my *MyReport*: 
>> *Error:(41,
>> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.*
>>
>> Help in resolving these errors would be greatly appreciated. Thanks in
>> advance.
>>
>>
>> withEventTime
>> .as[MyReport]
>>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>>   
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>>   .writeStream
>>   .queryName("test_query")
>>   .format("memory")
>>   .outputMode("update")
>>   .start()
>>
>>


Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I am using code similar to what's given below. *How do I specify
more than one field in the 'groupByKey'?*

There are other challenges as well. The book says we can use
'updateAcrossEvents' the way given below but I get compile time error
saying:


*Error:(43, 65) missing argument list for method updateAcrossEvents in
object MainUnapplied methods are only converted to functions when a
function type is expected.You can make this conversion explicit by writing
`updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
`updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
12) Unable to find encoder for type stored in a Dataset.  Primitive types
(Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._  Support for serializing other types will be
added in future releases.*

Help in resolving these errors would be greatly appreciated. Thanks in
advance.


withEventTime
.as[MyReport]
  .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
  .writeStream
  .queryName("test_query")
  .format("memory")
  .outputMode("update")
  .start()


Aggregating values by a key field in Spark Streaming

2020-02-28 Thread Something Something
Here's my use case: Messages are coming into a Kafka Topic for different
'Events'. Each event has a unique Event Id. I need to aggregate counts for
each Event AFTER the event is completed. For now, we are thinking we can
assume an event is completed if there are no more messages coming in for a
period of X minutes. How do I do this using Spark Streaming? I am reading
up on 'Stateful Transformations' with 'mapWithState'. Is that the right
approach? Any sample code would be even more appreciated. Thanks.


Spark Streaming: Aggregating values across batches

2020-02-27 Thread Something Something
We've a Spark Streaming job that calculates some values in each batch. What
we need to do now is aggregate values across ALL batches. What is the best
strategy to do this in Spark Streaming. Should we use 'Spark Accumulators'
for this?


Creating Custom Receiver for Spark Streaming

2015-10-12 Thread Something Something
Is it safe to assume that Spark will always create a single instance of
Custom Receiver? Or would it create multiple instances on each node in a
cluster? Wondering if I need to worry about receiving the same message on
different nodes etc.

Please help. Thanks.


Storing object in spark streaming

2015-10-12 Thread Something Something
In my custom receiver for Spark Streaming I've code such as this:

messages.toArray().foreach(msg => {
  val m = msg.asInstanceOf[Message]
*  store(m.getBody)*
})

Instead of 'body' which is of type 'String', I would rather pass the entire
Message object, but when I say store(m), I get a compiler error saying:
"Cannot resolve reference store with such signature"

But I see this method in 'Receiver.scala':

  def store(dataItem: T) {
executor.pushSingle(dataItem)
  }


How do I store the entire object? Please help. Thanks.


JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am embarrassed to admit but I can't get a basic 'word count' to work
under Kafka/Spark streaming.  My code looks like this.  I  don't see any
word counts in console output.  Also, don't see any output in UI.  Needless
to say, I am newbie in both 'Spark' as well as 'Kafka'.

Please help.  Thanks.

Here's the code:

public static void main(String[] args) {
if (args.length  4) {
System.err.println(Usage: JavaKafkaWordCount zkQuorum
group topics numThreads);
System.exit(1);
}

//StreamingExamples.setStreamingLogLevels();
//SparkConf sparkConf = new
SparkConf().setAppName(JavaKafkaWordCount);

// Location of the Spark directory
String sparkHome = /opt/mapr/spark/spark-1.0.2/;

// URL of the Spark cluster
String sparkUrl = spark://mymachine:7077;

// Location of the required JAR files
String jarFiles =
./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar;

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName(JavaKafkaWordCount);
sparkConf.setJars(new String[]{jarFiles});
sparkConf.setMaster(sparkUrl);
sparkConf.set(spark.ui.port, 2348);
sparkConf.setSparkHome(sparkHome);

MapString, String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect, myedgenode:2181);
kafkaParams.put(group.id, 1);
kafkaParams.put(metadata.broker.list, myedgenode:9092);
kafkaParams.put(serializer.class,
kafka.serializer.StringEncoder);
kafkaParams.put(request.required.acks, 1);

// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(2000));

int numThreads = Integer.parseInt(args[3]);
MapString, Integer topicMap = new HashMapString, Integer();
String[] topics = args[2].split(,);
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

//JavaPairReceiverInputDStreamString, String messages =
//KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaPairDStreamString, String messages =
KafkaUtils.createStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicMap,
StorageLevel.MEMORY_ONLY_SER());


JavaDStreamString lines = messages.map(new
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});

JavaDStreamString words = lines.flatMap(new
FlatMapFunctionString, String() {
@Override
public IterableString call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});

JavaPairDStreamString, Integer wordCounts = words.mapToPair(
new PairFunctionString, String, Integer() {
@Override
public Tuple2String, Integer call(String s) {
return new Tuple2String, Integer(s, 1);
}
}).reduceByKey(new Function2Integer, Integer, Integer() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

wordCounts.print();
jssc.start();
jssc.awaitTermination();


Re: JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am not running locally.  The Spark master is:

spark://machine name:7077



On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 What is the Spark master that you are using. Use local[4], not local
 if you are running locally.

 On Mon, Nov 10, 2014 at 3:01 PM, Something Something
 mailinglist...@gmail.com wrote:
  I am embarrassed to admit but I can't get a basic 'word count' to work
 under
  Kafka/Spark streaming.  My code looks like this.  I  don't see any word
  counts in console output.  Also, don't see any output in UI.  Needless to
  say, I am newbie in both 'Spark' as well as 'Kafka'.
 
  Please help.  Thanks.
 
  Here's the code:
 
  public static void main(String[] args) {
  if (args.length  4) {
  System.err.println(Usage: JavaKafkaWordCount zkQuorum
 group
  topics numThreads);
  System.exit(1);
  }
 
  //StreamingExamples.setStreamingLogLevels();
  //SparkConf sparkConf = new
  SparkConf().setAppName(JavaKafkaWordCount);
 
  // Location of the Spark directory
  String sparkHome = /opt/mapr/spark/spark-1.0.2/;
 
  // URL of the Spark cluster
  String sparkUrl = spark://mymachine:7077;
 
  // Location of the required JAR files
  String jarFiles =
 
 ./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar;
 
  SparkConf sparkConf = new SparkConf();
  sparkConf.setAppName(JavaKafkaWordCount);
  sparkConf.setJars(new String[]{jarFiles});
  sparkConf.setMaster(sparkUrl);
  sparkConf.set(spark.ui.port, 2348);
  sparkConf.setSparkHome(sparkHome);
 
  MapString, String kafkaParams = new HashMapString, String();
  kafkaParams.put(zookeeper.connect, myedgenode:2181);
  kafkaParams.put(group.id, 1);
  kafkaParams.put(metadata.broker.list, myedgenode:9092);
  kafkaParams.put(serializer.class,
  kafka.serializer.StringEncoder);
  kafkaParams.put(request.required.acks, 1);
 
  // Create the context with a 1 second batch size
  JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 new
  Duration(2000));
 
  int numThreads = Integer.parseInt(args[3]);
  MapString, Integer topicMap = new HashMapString, Integer();
  String[] topics = args[2].split(,);
  for (String topic: topics) {
  topicMap.put(topic, numThreads);
  }
 
  //JavaPairReceiverInputDStreamString, String messages =
  //KafkaUtils.createStream(jssc, args[0], args[1],
 topicMap);
  JavaPairDStreamString, String messages =
  KafkaUtils.createStream(jssc,
  String.class,
  String.class,
  StringDecoder.class,
  StringDecoder.class,
  kafkaParams,
  topicMap,
  StorageLevel.MEMORY_ONLY_SER());
 
 
  JavaDStreamString lines = messages.map(new
 FunctionTuple2String,
  String, String() {
  @Override
  public String call(Tuple2String, String tuple2) {
  return tuple2._2();
  }
  });
 
  JavaDStreamString words = lines.flatMap(new
  FlatMapFunctionString, String() {
  @Override
  public IterableString call(String x) {
  return Lists.newArrayList(SPACE.split(x));
  }
  });
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
  @Override
  public Tuple2String, Integer call(String s) {
  return new Tuple2String, Integer(s, 1);
  }
  }).reduceByKey(new Function2Integer, Integer,
 Integer() {
  @Override
  public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
  }
  });
 
  wordCounts.print();
  jssc.start();
  jssc.awaitTermination();
 



Re: Kafka Consumer in Spark Streaming

2014-11-05 Thread Something Something
As suggested by Qiaou, looked at the UI:

1)  Under 'Stages' the only 'active' stage is:  runJob at
ReceiverTracker.scala:275
2)  Under 'Executors', there's only 1 active task, but I don't see any
output (or logs)
3)  Under 'Streaming', there's one receiver called, 'KafkaReciever-0', but
'Records in last batch' are 0.

Honestly, I think it's not connecting to my Kafka topic - possibly because
I need to pass the following parameter:

metadata.broker.list - machine:9092

But I don't know how to pass this to KafkaUtils.createStream(...).  Could
that be the problem?



On Tue, Nov 4, 2014 at 11:12 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Your code doesn't trigger any action. How about the following?

 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
 Duration(60 * 1 * 1000));

 JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
 machine:2181, 1, map);

 JavaDStreamString statuses = tweets.map(
 new FunctionString, String() {
 public String call(String status) {
 System.out.println(status);
 return status;
 }
 }
 );


 statuses​.print()
 ​;​
  ​

 ​​

 Or you could use foreachRDD instead of map() if your intention is just
 printing.​​

 Thanks
 Best Regards

 On Wed, Nov 5, 2014 at 12:35 PM, Something Something 
 mailinglist...@gmail.com wrote:

 It's not local.  My spark url is something like this:

 String sparkUrl = spark://host name:7077;


 On Tue, Nov 4, 2014 at 11:03 PM, Jain Rahul ja...@ivycomptech.com
 wrote:


  I think you are running it locally.
 Do you have local[1] here for master url? If yes change it to local[2]
 or more number of threads.
 It may be due to topic name mismatch also.

  sparkConf.setMaster(“local[1]);

  Regards,
 Rahul

   From: Something Something mailinglist...@gmail.com
 Date: Wednesday, November 5, 2014 at 12:23 PM
 To: Shao, Saisai saisai.s...@intel.com
 Cc: user@spark.apache.org user@spark.apache.org

 Subject: Re: Kafka Consumer in Spark Streaming

   Added foreach as follows.  Still don't see any output on my console.
 Would this go to the worker logs as Jerry indicated?

 JavaPairReceiverInputDStream tweets =
 KafkaUtils.createStream(ssc, mymachine:2181, 1, map);
 JavaDStreamString statuses = tweets.map(
 new FunctionString, String() {
 public String call(String status) {
 return status;
 }
 }
 );

 statuses.foreach(new FunctionJavaRDDString, Void() {
 @Override
 public Void call(JavaRDDString stringJavaRDD) throws
 Exception {
 for (String str: stringJavaRDD.take(10)) {
 System.out.println(Message:  + str);
 }
 return null;
 }
 });


 On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  If you’re running on a standalone mode, the log is under
 SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check
 the document of Spark to see the details.



 Thanks

 Jerry



 *From:* Something Something [mailto:mailinglist...@gmail.com]
 *Sent:* Wednesday, November 05, 2014 2:28 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org
 *Subject:* Re: Kafka Consumer in Spark Streaming



 The Kafka broker definitely has messages coming in.  But your #2 point
 is valid.  Needless to say I am a newbie to Spark.  I can't figure out
 where the 'executor' logs would be.  How would I find them?

 All I see printed on my screen is this:

 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started
 14/11/04 22:21:23 INFO Remoting: Starting remoting
 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on
 addresses :[akka.tcp://spark@mymachie:60743]
 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@mymachine:60743]
 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling
 back to shell based
 ---
 Time: 141516852 ms
 ---
 ---
 Time: 141516852 ms
 ---

 Keeps repeating this...



 On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi, would you mind describing your problem a little more specific.



 1.  Is the Kafka broker currently has no data feed in?

 2.  This code will print the lines, but not in the driver side,
 the code is running in the executor side, so you can check the log in
 worker dir to see if there’s any printing logs under this folder.

 3.  Did you see any exceptions when running the app

Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
I've following code in my program.  I don't get any error, but it's not
consuming the messages either.  Shouldn't the following code print the line
in the 'call' method?  What am I missing?

Please help.  Thanks.



JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(60 * 1 * 1000));

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
machine:2181, 1, map);

JavaDStreamString statuses = tweets.map(
new FunctionString, String() {
public String call(String status) {
System.out.println(status);
return status;
}
}
);


Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
The Kafka broker definitely has messages coming in.  But your #2 point is
valid.  Needless to say I am a newbie to Spark.  I can't figure out where
the 'executor' logs would be.  How would I find them?

All I see printed on my screen is this:

14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started
14/11/04 22:21:23 INFO Remoting: Starting remoting
14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@mymachie:60743]
14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@mymachine:60743]
14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back
to shell based
---
Time: 141516852 ms
---
---
Time: 141516852 ms
---
Keeps repeating this...

On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi, would you mind describing your problem a little more specific.



 1.  Is the Kafka broker currently has no data feed in?

 2.  This code will print the lines, but not in the driver side, the
 code is running in the executor side, so you can check the log in worker
 dir to see if there’s any printing logs under this folder.

 3.  Did you see any exceptions when running the app, this will help
 to define the problem.



 Thanks

 Jerry



 *From:* Something Something [mailto:mailinglist...@gmail.com]
 *Sent:* Wednesday, November 05, 2014 1:57 PM
 *To:* user@spark.apache.org
 *Subject:* Kafka Consumer in Spark Streaming



 I've following code in my program.  I don't get any error, but it's not
 consuming the messages either.  Shouldn't the following code print the line
 in the 'call' method?  What am I missing?

 Please help.  Thanks.



 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
 Duration(60 * 1 * 1000));

 JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
 machine:2181, 1, map);

 JavaDStreamString statuses = tweets.map(
 new FunctionString, String() {
 public String call(String status) {
 System.out.println(status);
 return status;
 }
 }
 );



Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
Added foreach as follows.  Still don't see any output on my console.  Would
this go to the worker logs as Jerry indicated?

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
mymachine:2181, 1, map);
JavaDStreamString statuses = tweets.map(
new FunctionString, String() {
public String call(String status) {
return status;
}
}
);

statuses.foreach(new FunctionJavaRDDString, Void() {
@Override
public Void call(JavaRDDString stringJavaRDD) throws
Exception {
for (String str: stringJavaRDD.take(10)) {
System.out.println(Message:  + str);
}
return null;
}
});


On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com wrote:

  If you’re running on a standalone mode, the log is under
 SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check
 the document of Spark to see the details.



 Thanks

 Jerry



 *From:* Something Something [mailto:mailinglist...@gmail.com]
 *Sent:* Wednesday, November 05, 2014 2:28 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org
 *Subject:* Re: Kafka Consumer in Spark Streaming



 The Kafka broker definitely has messages coming in.  But your #2 point is
 valid.  Needless to say I am a newbie to Spark.  I can't figure out where
 the 'executor' logs would be.  How would I find them?

 All I see printed on my screen is this:

 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started
 14/11/04 22:21:23 INFO Remoting: Starting remoting
 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@mymachie:60743]
 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@mymachine:60743]
 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back
 to shell based
 ---
 Time: 141516852 ms
 ---
 ---
 Time: 141516852 ms
 ---

 Keeps repeating this...



 On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi, would you mind describing your problem a little more specific.



 1.  Is the Kafka broker currently has no data feed in?

 2.  This code will print the lines, but not in the driver side, the
 code is running in the executor side, so you can check the log in worker
 dir to see if there’s any printing logs under this folder.

 3.  Did you see any exceptions when running the app, this will help
 to define the problem.



 Thanks

 Jerry



 *From:* Something Something [mailto:mailinglist...@gmail.com]
 *Sent:* Wednesday, November 05, 2014 1:57 PM
 *To:* user@spark.apache.org
 *Subject:* Kafka Consumer in Spark Streaming



 I've following code in my program.  I don't get any error, but it's not
 consuming the messages either.  Shouldn't the following code print the line
 in the 'call' method?  What am I missing?

 Please help.  Thanks.



 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
 Duration(60 * 1 * 1000));

 JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
 machine:2181, 1, map);

 JavaDStreamString statuses = tweets.map(
 new FunctionString, String() {
 public String call(String status) {
 System.out.println(status);
 return status;
 }
 }
 );





Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
It's not local.  My spark url is something like this:

String sparkUrl = spark://host name:7077;


On Tue, Nov 4, 2014 at 11:03 PM, Jain Rahul ja...@ivycomptech.com wrote:


  I think you are running it locally.
 Do you have local[1] here for master url? If yes change it to local[2] or
 more number of threads.
 It may be due to topic name mismatch also.

  sparkConf.setMaster(“local[1]);

  Regards,
 Rahul

   From: Something Something mailinglist...@gmail.com
 Date: Wednesday, November 5, 2014 at 12:23 PM
 To: Shao, Saisai saisai.s...@intel.com
 Cc: user@spark.apache.org user@spark.apache.org

 Subject: Re: Kafka Consumer in Spark Streaming

   Added foreach as follows.  Still don't see any output on my console.
 Would this go to the worker logs as Jerry indicated?

 JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
 mymachine:2181, 1, map);
 JavaDStreamString statuses = tweets.map(
 new FunctionString, String() {
 public String call(String status) {
 return status;
 }
 }
 );

 statuses.foreach(new FunctionJavaRDDString, Void() {
 @Override
 public Void call(JavaRDDString stringJavaRDD) throws
 Exception {
 for (String str: stringJavaRDD.take(10)) {
 System.out.println(Message:  + str);
 }
 return null;
 }
 });


 On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  If you’re running on a standalone mode, the log is under
 SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check
 the document of Spark to see the details.



 Thanks

 Jerry



 *From:* Something Something [mailto:mailinglist...@gmail.com]
 *Sent:* Wednesday, November 05, 2014 2:28 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org
 *Subject:* Re: Kafka Consumer in Spark Streaming



 The Kafka broker definitely has messages coming in.  But your #2 point is
 valid.  Needless to say I am a newbie to Spark.  I can't figure out where
 the 'executor' logs would be.  How would I find them?

 All I see printed on my screen is this:

 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started
 14/11/04 22:21:23 INFO Remoting: Starting remoting
 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@mymachie:60743]
 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@mymachine:60743]
 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling
 back to shell based
 ---
 Time: 141516852 ms
 ---
 ---
 Time: 141516852 ms
 ---

 Keeps repeating this...



 On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi, would you mind describing your problem a little more specific.



 1.  Is the Kafka broker currently has no data feed in?

 2.  This code will print the lines, but not in the driver side, the
 code is running in the executor side, so you can check the log in worker
 dir to see if there’s any printing logs under this folder.

 3.  Did you see any exceptions when running the app, this will help
 to define the problem.



 Thanks

 Jerry



 *From:* Something Something [mailto:mailinglist...@gmail.com]
 *Sent:* Wednesday, November 05, 2014 1:57 PM
 *To:* user@spark.apache.org
 *Subject:* Kafka Consumer in Spark Streaming



 I've following code in my program.  I don't get any error, but it's not
 consuming the messages either.  Shouldn't the following code print the line
 in the 'call' method?  What am I missing?

 Please help.  Thanks.



 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
 new Duration(60 * 1 * 1000));

 JavaPairReceiverInputDStream tweets =
 KafkaUtils.createStream(ssc, machine:2181, 1, map);

 JavaDStreamString statuses = tweets.map(
 new FunctionString, String() {
 public String call(String status) {
 System.out.println(status);
 return status;
 }
 }
 );




   This email and any attachments are confidential, and may be legally
 privileged and protected by copyright. If you are not the intended
 recipient dissemination or copying of this email is prohibited. If you have
 received this in error, please notify the sender by replying by email and
 then delete the email completely from your system. Any views or opinions
 are solely those of the sender. This communication is not intended to form
 a binding contract unless expressly