Spark Structured Streaming: “earliest” as “startingOffsets” is not working
My Spark Structured Streaming job works fine when I set "startingOffsets" to "latest". When I simply change it to "earliest" & specify a new "check point directory", the job doesn't work. The states don't get timed out after 10 minutes. While debugging I noticed that my 'state' logic is indeed getting executed but states just don't time out - as they do when I use "latest". Any reason why? Is this a known issue? *Note*: I've tried this under Spark 2.3 & 2.4
Re: Using Spark Accumulators with Structured Streaming
*Honestly, I don't know how to do this in Scala.* I tried something like this... *.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( new StateUpdater(myAcc))* StateUpdater is similar to what Zhang has provided but it's NOT compiling 'cause I need to return a 'Dataset'. Here's the definition of mapGroupsWithState in Scala: def mapGroupsWithState[S: Encoder, U: Encoder]( timeoutConf: GroupStateTimeout)( func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = { On Mon, Jun 8, 2020 at 12:07 PM Srinivas V wrote: > Ya, I had asked this question before. No one responded. By the way, what’s > your actual name “Something something” if you don’t mind me asking? > > On Tue, Jun 9, 2020 at 12:27 AM Something Something < > mailinglist...@gmail.com> wrote: > >> What is scary is this interface is marked as "experimental" >> >> @Experimental >> @InterfaceStability.Evolving >> public interface MapGroupsWithStateFunction extends Serializable >> { >> R call(K key, Iterator values, GroupState state) throws Exception; >> } >> >> >> >> >> On Mon, Jun 8, 2020 at 11:54 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Right, this is exactly how I've it right now. Problem is in the cluster >>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you >>> will see what I mean. >>> >>> I think how Zhang is using will work. Will try & revert. >>> >>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V wrote: >>> >>>> >>>> You don’t need to have a separate class. I created that as it has lot >>>> of code and logic in my case. >>>> For you to quickly test you can use Zhang’s Scala code in this chain. >>>> Pasting it below for your quick reference: >>>> >>>> ```scala >>>> spark.streams.addListener(new StreamingQueryListener { >>>> override def onQueryProgress(event: >>>> StreamingQueryListener.QueryProgressEvent): >>>> Unit = { >>>> println(event.progress.id + " is on progress") >>>> println(s"My accu is ${myAcc.value} on query progress") >>>> } >>>> ... >>>> }) >>>> >>>> def mappingFunc(key: Long, values: Iterator[String], state: >>>> GroupState[Long]): ... = { >>>> myAcc.add(1) >>>> println(s">>> key: $key => state: ${state}") >>>> ... >>>> } >>>> >>>> val wordCounts = words >>>> .groupByKey(v => ...) >>>> .mapGroupsWithState(timeoutConf = >>>> GroupStateTimeout.ProcessingTimeTimeout)(func >>>> = mappingFunc) >>>> >>>> val query = wordCounts.writeStream >>>> .outputMode(OutputMode.Update) >>>> >>>> >>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something < >>>> mailinglist...@gmail.com> wrote: >>>> >>>>> Great. I guess the trick is to use a separate class such as >>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into >>>>> Scala. Will try it out & revert. Thanks for the tips. >>>>> >>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei wrote: >>>>> >>>>>> The following Java codes can work in my cluster environment: >>>>>> ``` >>>>>> .mapGroupsWithState((MapGroupsWithStateFunction>>>>> Long, LeadingCharCount>) (key, values, state) -> { >>>>>> myAcc.add(1); >>>>>> <...> >>>>>> state.update(newState); >>>>>> return new LeadingCharCount(key, newState); >>>>>> }, >>>>>> Encoders.LONG(), >>>>>> Encoders.bean(LeadingCharCount.class), >>>>>> GroupStateTimeout.ProcessingTimeTimeout()) >>>>>> ``` >>>>>> >>>>>> Also works fine with my `StateUpdateTask`: >>>>>> ``` >>>>>> .mapGroupsWithState( >>>>>> new StateUpdateTask(myAcc), >>>>>> Encoders.LONG(), >>>>>> Encoders.bean(LeadingCharCount.class), >>>>>> GroupStateTimeout.Pro
Re: Using Spark Accumulators with Structured Streaming
What is scary is this interface is marked as "experimental" @Experimental @InterfaceStability.Evolving public interface MapGroupsWithStateFunction extends Serializable { R call(K key, Iterator values, GroupState state) throws Exception; } On Mon, Jun 8, 2020 at 11:54 AM Something Something < mailinglist...@gmail.com> wrote: > Right, this is exactly how I've it right now. Problem is in the cluster > mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you > will see what I mean. > > I think how Zhang is using will work. Will try & revert. > > On Mon, Jun 8, 2020 at 10:58 AM Srinivas V wrote: > >> >> You don’t need to have a separate class. I created that as it has lot of >> code and logic in my case. >> For you to quickly test you can use Zhang’s Scala code in this chain. >> Pasting it below for your quick reference: >> >> ```scala >> spark.streams.addListener(new StreamingQueryListener { >> override def onQueryProgress(event: >> StreamingQueryListener.QueryProgressEvent): >> Unit = { >> println(event.progress.id + " is on progress") >> println(s"My accu is ${myAcc.value} on query progress") >> } >> ... >> }) >> >> def mappingFunc(key: Long, values: Iterator[String], state: >> GroupState[Long]): ... = { >> myAcc.add(1) >> println(s">>> key: $key => state: ${state}") >> ... >> } >> >> val wordCounts = words >> .groupByKey(v => ...) >> .mapGroupsWithState(timeoutConf = >> GroupStateTimeout.ProcessingTimeTimeout)(func >> = mappingFunc) >> >> val query = wordCounts.writeStream >> .outputMode(OutputMode.Update) >> >> >> On Mon, Jun 8, 2020 at 11:14 AM Something Something < >> mailinglist...@gmail.com> wrote: >> >>> Great. I guess the trick is to use a separate class such as >>> 'StateUpdateTask'. I will try that. My challenge is to convert this into >>> Scala. Will try it out & revert. Thanks for the tips. >>> >>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei wrote: >>> >>>> The following Java codes can work in my cluster environment: >>>> ``` >>>> .mapGroupsWithState((MapGroupsWithStateFunction>>> Long, LeadingCharCount>) (key, values, state) -> { >>>> myAcc.add(1); >>>> <...> >>>> state.update(newState); >>>> return new LeadingCharCount(key, newState); >>>> }, >>>> Encoders.LONG(), >>>> Encoders.bean(LeadingCharCount.class), >>>> GroupStateTimeout.ProcessingTimeTimeout()) >>>> ``` >>>> >>>> Also works fine with my `StateUpdateTask`: >>>> ``` >>>> .mapGroupsWithState( >>>> new StateUpdateTask(myAcc), >>>> Encoders.LONG(), >>>> Encoders.bean(LeadingCharCount.class), >>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>> >>>> public class StateUpdateTask >>>> implements MapGroupsWithStateFunction>>> LeadingCharCount> { >>>> private LongAccumulator myAccInTask; >>>> >>>> public StateUpdateTask(LongAccumulator acc) { >>>> this.myAccInTask = acc; >>>> } >>>> >>>> @Override >>>> public LeadingCharCount call(String key, Iterator >>>> values, GroupState state) throws Exception { >>>> myAccInTask.add(1); >>>> <...> >>>> state.update(newState); >>>> return new LeadingCharCount(key, newState); >>>> } >>>> } >>>> ``` >>>> >>>> -- >>>> Cheers, >>>> -z >>>> >>>> On Tue, 2 Jun 2020 10:28:36 +0800 >>>> ZHANG Wei wrote: >>>> >>>> > Yes, verified on the cluster with 5 executors. >>>> > >>>> > -- >>>> > Cheers, >>>> > -z >>>> > >>>> > On Fri, 29 May 2020 11:16:12 -0700 >>>> > Something Something wrote: >>>> > >>>> > > Did you try this on the Cluster? Note: This works just fine under >>>>
Re: Using Spark Accumulators with Structured Streaming
Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean. I think how Zhang is using will work. Will try & revert. On Mon, Jun 8, 2020 at 10:58 AM Srinivas V wrote: > > You don’t need to have a separate class. I created that as it has lot of > code and logic in my case. > For you to quickly test you can use Zhang’s Scala code in this chain. > Pasting it below for your quick reference: > > ```scala > spark.streams.addListener(new StreamingQueryListener { > override def onQueryProgress(event: > StreamingQueryListener.QueryProgressEvent): > Unit = { > println(event.progress.id + " is on progress") > println(s"My accu is ${myAcc.value} on query progress") > } > ... > }) > > def mappingFunc(key: Long, values: Iterator[String], state: > GroupState[Long]): ... = { > myAcc.add(1) > println(s">>> key: $key => state: ${state}") > ... > } > > val wordCounts = words > .groupByKey(v => ...) > .mapGroupsWithState(timeoutConf = > GroupStateTimeout.ProcessingTimeTimeout)(func > = mappingFunc) > > val query = wordCounts.writeStream > .outputMode(OutputMode.Update) > > > On Mon, Jun 8, 2020 at 11:14 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Great. I guess the trick is to use a separate class such as >> 'StateUpdateTask'. I will try that. My challenge is to convert this into >> Scala. Will try it out & revert. Thanks for the tips. >> >> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei wrote: >> >>> The following Java codes can work in my cluster environment: >>> ``` >>> .mapGroupsWithState((MapGroupsWithStateFunction>> Long, LeadingCharCount>) (key, values, state) -> { >>> myAcc.add(1); >>> <...> >>> state.update(newState); >>> return new LeadingCharCount(key, newState); >>> }, >>> Encoders.LONG(), >>> Encoders.bean(LeadingCharCount.class), >>> GroupStateTimeout.ProcessingTimeTimeout()) >>> ``` >>> >>> Also works fine with my `StateUpdateTask`: >>> ``` >>> .mapGroupsWithState( >>> new StateUpdateTask(myAcc), >>> Encoders.LONG(), >>> Encoders.bean(LeadingCharCount.class), >>> GroupStateTimeout.ProcessingTimeTimeout()); >>> >>> public class StateUpdateTask >>> implements MapGroupsWithStateFunction>> LeadingCharCount> { >>> private LongAccumulator myAccInTask; >>> >>> public StateUpdateTask(LongAccumulator acc) { >>> this.myAccInTask = acc; >>> } >>> >>> @Override >>> public LeadingCharCount call(String key, Iterator >>> values, GroupState state) throws Exception { >>> myAccInTask.add(1); >>> <...> >>> state.update(newState); >>> return new LeadingCharCount(key, newState); >>> } >>> } >>> ``` >>> >>> -- >>> Cheers, >>> -z >>> >>> On Tue, 2 Jun 2020 10:28:36 +0800 >>> ZHANG Wei wrote: >>> >>> > Yes, verified on the cluster with 5 executors. >>> > >>> > -- >>> > Cheers, >>> > -z >>> > >>> > On Fri, 29 May 2020 11:16:12 -0700 >>> > Something Something wrote: >>> > >>> > > Did you try this on the Cluster? Note: This works just fine under >>> 'Local' >>> > > mode. >>> > > >>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei >>> wrote: >>> > > >>> > > > I can't reproduce the issue with my simple code: >>> > > > ```scala >>> > > > spark.streams.addListener(new StreamingQueryListener { >>> > > > override def onQueryProgress(event: >>> > > > StreamingQueryListener.QueryProgressEvent): Unit = { >>> > > > println(event.progress.id + " is on progress") >>> > > > println(s"My accu is ${myAcc.value} on query progress") >>> > > > } >>> > > >
Re: Using Spark Accumulators with Structured Streaming
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips. On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei wrote: > The following Java codes can work in my cluster environment: > ``` > .mapGroupsWithState((MapGroupsWithStateFunction LeadingCharCount>) (key, values, state) -> { > myAcc.add(1); > <...> > state.update(newState); > return new LeadingCharCount(key, newState); > }, > Encoders.LONG(), > Encoders.bean(LeadingCharCount.class), > GroupStateTimeout.ProcessingTimeTimeout()) > ``` > > Also works fine with my `StateUpdateTask`: > ``` > .mapGroupsWithState( > new StateUpdateTask(myAcc), > Encoders.LONG(), > Encoders.bean(LeadingCharCount.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > public class StateUpdateTask > implements MapGroupsWithStateFunction LeadingCharCount> { > private LongAccumulator myAccInTask; > > public StateUpdateTask(LongAccumulator acc) { > this.myAccInTask = acc; > } > > @Override > public LeadingCharCount call(String key, Iterator values, > GroupState state) throws Exception { > myAccInTask.add(1); > <...> > state.update(newState); > return new LeadingCharCount(key, newState); > } > } > ``` > > -- > Cheers, > -z > > On Tue, 2 Jun 2020 10:28:36 +0800 > ZHANG Wei wrote: > > > Yes, verified on the cluster with 5 executors. > > > > -- > > Cheers, > > -z > > > > On Fri, 29 May 2020 11:16:12 -0700 > > Something Something wrote: > > > > > Did you try this on the Cluster? Note: This works just fine under > 'Local' > > > mode. > > > > > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei wrote: > > > > > > > I can't reproduce the issue with my simple code: > > > > ```scala > > > > spark.streams.addListener(new StreamingQueryListener { > > > > override def onQueryProgress(event: > > > > StreamingQueryListener.QueryProgressEvent): Unit = { > > > > println(event.progress.id + " is on progress") > > > > println(s"My accu is ${myAcc.value} on query progress") > > > > } > > > > ... > > > > }) > > > > > > > > def mappingFunc(key: Long, values: Iterator[String], state: > > > > GroupState[Long]): ... = { > > > > myAcc.add(1) > > > > println(s">>> key: $key => state: ${state}") > > > > ... > > > > } > > > > > > > > val wordCounts = words > > > > .groupByKey(v => ...) > > > > .mapGroupsWithState(timeoutConf = > > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) > > > > > > > > val query = wordCounts.writeStream > > > > .outputMode(OutputMode.Update) > > > > ... > > > > ``` > > > > > > > > I'm wondering if there were any errors can be found from driver > logs? The > > > > micro-batch > > > > exceptions won't terminate the streaming job running. > > > > > > > > For the following code, we have to make sure that `StateUpdateTask` > is > > > > started: > > > > > .mapGroupsWithState( > > > > > new > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > > appConfig, accumulators), > > > > > Encoders.bean(ModelStateInfo.class), > > > > > Encoders.bean(ModelUpdate.class), > > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > > > -- > > > > Cheers, > > > > -z > > > > > > > > On Thu, 28 May 2020 19:59:31 +0530 > > > > Srinivas V wrote: > > > > > > > > > Giving the code below: > > > > > //accumulators is a class level variable in driver. > > > > > > > > > > sparkSession.streams().addListener(new StreamingQueryListener() { >
Re: Using Spark Accumulators with Structured Streaming
I mean... I don't see any reference to 'accumulator' in your Class *definition*. How can you access it in the class if it's not in your definition of class: public class StateUpdateTask implements MapGroupsWithStateFunction<*String, InputEventModel, ModelStateInfo, ModelUpdate*> {. *--> I was expecting to see 'accumulator' here in the definition.* @Override public ModelUpdate call(String productId, Iterator eventsIterator, GroupState state) { } } On Fri, May 29, 2020 at 1:08 PM Srinivas V wrote: > > Yes, accumulators are updated in the call method of StateUpdateTask. Like > when state times out or when the data is pushed to next Kafka topic etc. > > On Fri, May 29, 2020 at 11:55 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Thanks! I will take a look at the link. Just one question, you seem to be >> passing 'accumulators' in the constructor but where do you use it in the >> StateUpdateTask class? I am still missing that connection. Sorry, if my >> question is dumb. I must be missing something. Thanks for your help so far. >> It's been useful. >> >> >> On Fri, May 29, 2020 at 6:51 AM Srinivas V wrote: >> >>> Yes it is application specific class. This is how java Spark Functions >>> work. >>> You can refer to this code in the documentation: >>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java >>> >>> public class StateUpdateTask implements >>> MapGroupsWithStateFunction>> ModelUpdate> { >>> >>> @Override >>> public ModelUpdate call(String productId, Iterator >>> eventsIterator, GroupState state) { >>> } >>> } >>> >>> On Thu, May 28, 2020 at 10:59 PM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> I am assuming StateUpdateTask is your application specific class. Does >>>> it have 'updateState' method or something? I googled but couldn't find any >>>> documentation about doing it this way. Can you please direct me to some >>>> documentation. Thanks. >>>> >>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V wrote: >>>> >>>>> yes, I am using stateful structured streaming. Yes similar to what you >>>>> do. This is in Java >>>>> I do it this way: >>>>> Dataset productUpdates = watermarkedDS >>>>> .groupByKey( >>>>> (MapFunction) event >>>>> -> event.getId(), Encoders.STRING()) >>>>> .mapGroupsWithState( >>>>> new >>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>>>> appConfig, accumulators), >>>>> Encoders.bean(ModelStateInfo.class), >>>>> Encoders.bean(ModelUpdate.class), >>>>> GroupStateTimeout.ProcessingTimeTimeout()); >>>>> >>>>> StateUpdateTask contains the update method. >>>>> >>>>> On Thu, May 28, 2020 at 4:41 AM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> Yes, that's exactly how I am creating them. >>>>>> >>>>>> Question... Are you using 'Stateful Structured Streaming' in which >>>>>> you've something like this? >>>>>> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>>> updateAcrossEvents >>>>>> ) >>>>>> >>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>>>>> experiencing this only under 'Stateful Structured Streaming'. In other >>>>>> streaming applications it works as expected. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V >>>>>> wrote: >>>>>> >>>>>>> Yes, I am talking about Application specific Accumulators. Actually >>>>>>> I am getting the values printed in my driver log as well as sent to >>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is >>>>>>> “client” >>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It &
Re: Using Spark Accumulators with Structured Streaming
Thanks! I will take a look at the link. Just one question, you seem to be passing 'accumulators' in the constructor but where do you use it in the StateUpdateTask class? I am still missing that connection. Sorry, if my question is dumb. I must be missing something. Thanks for your help so far. It's been useful. On Fri, May 29, 2020 at 6:51 AM Srinivas V wrote: > Yes it is application specific class. This is how java Spark Functions > work. > You can refer to this code in the documentation: > https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java > > public class StateUpdateTask implements MapGroupsWithStateFunction InputEventModel, ModelStateInfo, ModelUpdate> { > > @Override > public ModelUpdate call(String productId, Iterator > eventsIterator, GroupState state) { > } > } > > On Thu, May 28, 2020 at 10:59 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I am assuming StateUpdateTask is your application specific class. Does it >> have 'updateState' method or something? I googled but couldn't find any >> documentation about doing it this way. Can you please direct me to some >> documentation. Thanks. >> >> On Thu, May 28, 2020 at 4:43 AM Srinivas V wrote: >> >>> yes, I am using stateful structured streaming. Yes similar to what you >>> do. This is in Java >>> I do it this way: >>> Dataset productUpdates = watermarkedDS >>> .groupByKey( >>> (MapFunction) event -> >>> event.getId(), Encoders.STRING()) >>> .mapGroupsWithState( >>> new >>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), >>> appConfig, accumulators), >>> Encoders.bean(ModelStateInfo.class), >>> Encoders.bean(ModelUpdate.class), >>> GroupStateTimeout.ProcessingTimeTimeout()); >>> >>> StateUpdateTask contains the update method. >>> >>> On Thu, May 28, 2020 at 4:41 AM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> Yes, that's exactly how I am creating them. >>>> >>>> Question... Are you using 'Stateful Structured Streaming' in which >>>> you've something like this? >>>> >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> updateAcrossEvents >>>> ) >>>> >>>> And updating the Accumulator inside 'updateAcrossEvents'? We're >>>> experiencing this only under 'Stateful Structured Streaming'. In other >>>> streaming applications it works as expected. >>>> >>>> >>>> >>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: >>>> >>>>> Yes, I am talking about Application specific Accumulators. Actually I >>>>> am getting the values printed in my driver log as well as sent to Grafana. >>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a >>>>> yarn cluster(not local Mac) where I submit from master node. It should >>>>> work >>>>> the same for cluster mode as well. >>>>> Create accumulators like this: >>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>>>> >>>>> >>>>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> Hmm... how would they go to Graphana if they are not getting computed >>>>>> in your code? I am talking about the Application Specific Accumulators. >>>>>> The >>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>>>> getting populated correctly! >>>>>> >>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>>>> But one consolation is that when I send metrics to Graphana, the >>>>>>> values are coming there. >>>>>>> >>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
Re: Using Spark Accumulators with Structured Streaming
gt; > > > > And the accumulator value updated can be found from driver stdout. > > > > > > -- > > > Cheers, > > > -z > > > > > > On Thu, 28 May 2020 17:12:48 +0530 > > > Srinivas V wrote: > > > > > > > yes, I am using stateful structured streaming. Yes similar to what > you > > > do. > > > > This is in Java > > > > I do it this way: > > > > Dataset productUpdates = watermarkedDS > > > > .groupByKey( > > > > (MapFunction) event > -> > > > > event.getId(), Encoders.STRING()) > > > > .mapGroupsWithState( > > > > new > > > > > > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > appConfig, accumulators), > > > > Encoders.bean(ModelStateInfo.class), > > > > Encoders.bean(ModelUpdate.class), > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > > > StateUpdateTask contains the update method. > > > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something < > > > > mailinglist...@gmail.com> wrote: > > > > > > > > > Yes, that's exactly how I am creating them. > > > > > > > > > > Question... Are you using 'Stateful Structured Streaming' in which > > > you've > > > > > something like this? > > > > > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > > > > > updateAcrossEvents > > > > > ) > > > > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're > > > experiencing this only under 'Stateful Structured Streaming'. In other > > > streaming applications it works as expected. > > > > > > > > > > > > > > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V > > > wrote: > > > > > > > > > >> Yes, I am talking about Application specific Accumulators. > Actually I > > > am > > > > >> getting the values printed in my driver log as well as sent to > > > Grafana. Not > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on > a > > > yarn > > > > >> cluster(not local Mac) where I submit from master node. It should > > > work the > > > > >> same for cluster mode as well. > > > > >> Create accumulators like this: > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > > >> > > > > >> > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something < > > > > >> mailinglist...@gmail.com> wrote: > > > > >> > > > > >>> Hmm... how would they go to Graphana if they are not getting > > > computed in > > > > >>> your code? I am talking about the Application Specific > Accumulators. > > > The > > > > >>> other standard counters such as > 'event.progress.inputRowsPerSecond' > > > are > > > > >>> getting populated correctly! > > > > >>> > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V > > > wrote: > > > > >>> > > > > >>>> Hello, > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on > > > cluster. > > > > >>>> But one consolation is that when I send metrics to Graphana, the > > > values > > > > >>>> are coming there. > > > > >>>> > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something < > > > > >>>> mailinglist...@gmail.com> wrote: > > > > >>>> > > > > >>>>> No this is not working even if I use LongAccumulator. > > > > >>>>> > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei > > > > wrote: > > > > >>>>> > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
Re: Using Spark Accumulators with Structured Streaming
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks. On Thu, May 28, 2020 at 4:43 AM Srinivas V wrote: > yes, I am using stateful structured streaming. Yes similar to what you do. > This is in Java > I do it this way: > Dataset productUpdates = watermarkedDS > .groupByKey( > (MapFunction) event -> > event.getId(), Encoders.STRING()) > .mapGroupsWithState( > new > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > appConfig, accumulators), > Encoders.bean(ModelStateInfo.class), > Encoders.bean(ModelUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > StateUpdateTask contains the update method. > > On Thu, May 28, 2020 at 4:41 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Yes, that's exactly how I am creating them. >> >> Question... Are you using 'Stateful Structured Streaming' in which you've >> something like this? >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> updateAcrossEvents >> ) >> >> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing >> this only under 'Stateful Structured Streaming'. In other streaming >> applications it works as expected. >> >> >> >> On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: >> >>> Yes, I am talking about Application specific Accumulators. Actually I am >>> getting the values printed in my driver log as well as sent to Grafana. Not >>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn >>> cluster(not local Mac) where I submit from master node. It should work the >>> same for cluster mode as well. >>> Create accumulators like this: >>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name); >>> >>> >>> On Tue, May 26, 2020 at 8:42 PM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> Hmm... how would they go to Graphana if they are not getting computed >>>> in your code? I am talking about the Application Specific Accumulators. The >>>> other standard counters such as 'event.progress.inputRowsPerSecond' are >>>> getting populated correctly! >>>> >>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: >>>> >>>>> Hello, >>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>>>> But one consolation is that when I send metrics to Graphana, the >>>>> values are coming there. >>>>> >>>>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>>>> mailinglist...@gmail.com> wrote: >>>>> >>>>>> No this is not working even if I use LongAccumulator. >>>>>> >>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>>>> >>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>>>> replace >>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>>>> LongAccumulator[3] >>>>>>> and test if the StreamingListener and other codes are able to work? >>>>>>> >>>>>>> --- >>>>>>> Cheers, >>>>>>> -z >>>>>>> [1] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>>>> [2] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>>>> [3] >>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>>>> >>>>>>> >>>>>>> From: Something Something >>>>>>> Sent: Saturday, May 16, 2020 0:38 >&g
Re: Using Spark Accumulators with Structured Streaming
Yes, that's exactly how I am creating them. Question... Are you using 'Stateful Structured Streaming' in which you've something like this? .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected. On Wed, May 27, 2020 at 9:01 AM Srinivas V wrote: > Yes, I am talking about Application specific Accumulators. Actually I am > getting the values printed in my driver log as well as sent to Grafana. Not > sure where and when I saw 0 before. My deploy mode is “client” on a yarn > cluster(not local Mac) where I submit from master node. It should work the > same for cluster mode as well. > Create accumulators like this: > AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > On Tue, May 26, 2020 at 8:42 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Hmm... how would they go to Graphana if they are not getting computed in >> your code? I am talking about the Application Specific Accumulators. The >> other standard counters such as 'event.progress.inputRowsPerSecond' are >> getting populated correctly! >> >> On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: >> >>> Hello, >>> Even for me it comes as 0 when I print in OnQueryProgress. I use >>> LongAccumulator as well. Yes, it prints on my local but not on cluster. >>> But one consolation is that when I send metrics to Graphana, the values >>> are coming there. >>> >>> On Tue, May 26, 2020 at 3:10 AM Something Something < >>> mailinglist...@gmail.com> wrote: >>> >>>> No this is not working even if I use LongAccumulator. >>>> >>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >>>> >>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should >>>>> be atomic or thread safe. I'm wondering if the implementation for >>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to >>>>> replace >>>>> CollectionLongAccumulator by CollectionAccumulator[2] or >>>>> LongAccumulator[3] >>>>> and test if the StreamingListener and other codes are able to work? >>>>> >>>>> --- >>>>> Cheers, >>>>> -z >>>>> [1] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>>>> [2] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>>>> [3] >>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>>>> >>>>> >>>>> From: Something Something >>>>> Sent: Saturday, May 16, 2020 0:38 >>>>> To: spark-user >>>>> Subject: Re: Using Spark Accumulators with Structured Streaming >>>>> >>>>> Can someone from Spark Development team tell me if this functionality >>>>> is supported and tested? I've spent a lot of time on this but can't get it >>>>> to work. Just to add more context, we've our own Accumulator class that >>>>> extends from AccumulatorV2. In this class we keep track of one or more >>>>> accumulators. Here's the definition: >>>>> >>>>> >>>>> class CollectionLongAccumulator[T] >>>>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>>>> >>>>> When the job begins we register an instance of this class: >>>>> >>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>>>> >>>>> Is this working under Structured Streaming? >>>>> >>>>> I will keep looking for alternate approaches but any help would be >>>>> greatly appreciated. Thanks. >>>>> >>>>> >>>>> >>>>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>>>> >>>>> In my structured streaming job I am updating Spark Accumulators in the >>>>> updateAcrossEvents method but they are always 0 when I try to print them >>>>> in >>>>> my StreamingListener. Here's the code: >>>>> >>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>>> updateAcrossEvents >>>>> ) >>>>> >>>>> >>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>>>> StreamingListener which writes values of the accumulators in >>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>>>> ZERO! >>>>> >>>>> When I added log statements in the updateAcrossEvents, I could see >>>>> that these accumulators are getting incremented as expected. >>>>> >>>>> This only happens when I run in the 'Cluster' mode. In Local mode it >>>>> works fine which implies that the Accumulators are not getting distributed >>>>> correctly - or something like that! >>>>> >>>>> Note: I've seen quite a few answers on the Web that tell me to perform >>>>> an "Action". That's not a solution here. This is a 'Stateful Structured >>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>>>> >>>>> >>>>> >>>>>
Re: RecordTooLargeException in Spark *Structured* Streaming
Thanks. Missed that part of documentation. Appreciate your help. Regards. On Mon, May 25, 2020 at 10:42 PM Jungtaek Lim wrote: > Hi, > > You need to add the prefix "kafka." for the configurations which should be > propagated to the Kafka. Others will be used in Spark data source > itself. (Kafka connector in this case) > > > https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations > > Hope this helps. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > > On Tue, May 26, 2020 at 6:42 AM Something Something < > mailinglist...@gmail.com> wrote: > >> I keep getting this error message: >> >> >> *The message is 1169350 bytes when serialized which is larger than the >> maximum request size you have configured with the max.request.size >> configuration.* >> >> >> >> As indicated in other posts, I am trying to set the “max.request.size” >> configuration in the Producer as follows: >> >> >> - >> >> .writeStream >> >> .format(*"kafka"*) >> >> .option( >> >> *"kafka.bootstrap.servers"*, >> >> conig.outputBootstrapServer >> >> ) >> >> .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*) >> >> - >> >> >> >> But this is not working. Am I setting this correctly? Is there a >> different way to set this property under Spark Structured Streaming? >> >> >> Please help. Thanks. >> >> >>
Re: Using Spark Accumulators with Structured Streaming
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly! On Mon, May 25, 2020 at 8:39 PM Srinivas V wrote: > Hello, > Even for me it comes as 0 when I print in OnQueryProgress. I use > LongAccumulator as well. Yes, it prints on my local but not on cluster. > But one consolation is that when I send metrics to Graphana, the values > are coming there. > > On Tue, May 26, 2020 at 3:10 AM Something Something < > mailinglist...@gmail.com> wrote: > >> No this is not working even if I use LongAccumulator. >> >> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: >> >>> There is a restriction in AccumulatorV2 API [1], the OUT type should be >>> atomic or thread safe. I'm wondering if the implementation for >>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace >>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] >>> and test if the StreamingListener and other codes are able to work? >>> >>> --- >>> Cheers, >>> -z >>> [1] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 >>> [2] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator >>> [3] >>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator >>> >>> >>> From: Something Something >>> Sent: Saturday, May 16, 2020 0:38 >>> To: spark-user >>> Subject: Re: Using Spark Accumulators with Structured Streaming >>> >>> Can someone from Spark Development team tell me if this functionality is >>> supported and tested? I've spent a lot of time on this but can't get it to >>> work. Just to add more context, we've our own Accumulator class that >>> extends from AccumulatorV2. In this class we keep track of one or more >>> accumulators. Here's the definition: >>> >>> >>> class CollectionLongAccumulator[T] >>> extends AccumulatorV2[T, java.util.Map[T, Long]] >>> >>> When the job begins we register an instance of this class: >>> >>> spark.sparkContext.register(myAccumulator, "MyAccumulator") >>> >>> Is this working under Structured Streaming? >>> >>> I will keep looking for alternate approaches but any help would be >>> greatly appreciated. Thanks. >>> >>> >>> >>> On Thu, May 14, 2020 at 2:36 PM Something Something < >>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: >>> >>> In my structured streaming job I am updating Spark Accumulators in the >>> updateAcrossEvents method but they are always 0 when I try to print them in >>> my StreamingListener. Here's the code: >>> >>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>> updateAcrossEvents >>> ) >>> >>> >>> The accumulators get incremented in 'updateAcrossEvents'. I've a >>> StreamingListener which writes values of the accumulators in >>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS >>> ZERO! >>> >>> When I added log statements in the updateAcrossEvents, I could see that >>> these accumulators are getting incremented as expected. >>> >>> This only happens when I run in the 'Cluster' mode. In Local mode it >>> works fine which implies that the Accumulators are not getting distributed >>> correctly - or something like that! >>> >>> Note: I've seen quite a few answers on the Web that tell me to perform >>> an "Action". That's not a solution here. This is a 'Stateful Structured >>> Streaming' job. Yes, I am also 'registering' them in SparkContext. >>> >>> >>> >>>
RecordTooLargeException in Spark *Structured* Streaming
I keep getting this error message: *The message is 1169350 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.* As indicated in other posts, I am trying to set the “max.request.size” configuration in the Producer as follows: - .writeStream .format(*"kafka"*) .option( *"kafka.bootstrap.servers"*, conig.outputBootstrapServer ) .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*) - But this is not working. Am I setting this correctly? Is there a different way to set this property under Spark Structured Streaming? Please help. Thanks.
Re: Using Spark Accumulators with Structured Streaming
No this is not working even if I use LongAccumulator. On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: > There is a restriction in AccumulatorV2 API [1], the OUT type should be > atomic or thread safe. I'm wondering if the implementation for > `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace > CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] > and test if the StreamingListener and other codes are able to work? > > --- > Cheers, > -z > [1] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2 > [2] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator > [3] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator > > ________ > From: Something Something > Sent: Saturday, May 16, 2020 0:38 > To: spark-user > Subject: Re: Using Spark Accumulators with Structured Streaming > > Can someone from Spark Development team tell me if this functionality is > supported and tested? I've spent a lot of time on this but can't get it to > work. Just to add more context, we've our own Accumulator class that > extends from AccumulatorV2. In this class we keep track of one or more > accumulators. Here's the definition: > > > class CollectionLongAccumulator[T] > extends AccumulatorV2[T, java.util.Map[T, Long]] > > When the job begins we register an instance of this class: > > spark.sparkContext.register(myAccumulator, "MyAccumulator") > > Is this working under Structured Streaming? > > I will keep looking for alternate approaches but any help would be greatly > appreciated. Thanks. > > > > On Thu, May 14, 2020 at 2:36 PM Something Something < > mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote: > > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but they are always 0 when I try to print them in > my StreamingListener. Here's the code: > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > > The accumulators get incremented in 'updateAcrossEvents'. I've a > StreamingListener which writes values of the accumulators in > 'onQueryProgress' method but in this method the Accumulators are ALWAYS > ZERO! > > When I added log statements in the updateAcrossEvents, I could see that > these accumulators are getting incremented as expected. > > This only happens when I run in the 'Cluster' mode. In Local mode it works > fine which implies that the Accumulators are not getting distributed > correctly - or something like that! > > Note: I've seen quite a few answers on the Web that tell me to perform an > "Action". That's not a solution here. This is a 'Stateful Structured > Streaming' job. Yes, I am also 'registering' them in SparkContext. > > > >
Re: Using Spark Accumulators with Structured Streaming
Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition: class CollectionLongAccumulator[T] extends AccumulatorV2[T, java.util.Map[T, Long]] When the job begins we register an instance of this class: spark.sparkContext.register(myAccumulator, "MyAccumulator") Is this working under Structured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something < mailinglist...@gmail.com> wrote: > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but they are always 0 when I try to print them in > my StreamingListener. Here's the code: > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > > The accumulators get incremented in 'updateAcrossEvents'. I've a > StreamingListener which writes values of the accumulators in > 'onQueryProgress' method but in this method the Accumulators are ALWAYS > ZERO! > > When I added log statements in the updateAcrossEvents, I could see that > these accumulators are getting incremented as expected. > > This only happens when I run in the 'Cluster' mode. In Local mode it works > fine which implies that the Accumulators are not getting distributed > correctly - or something like that! > > Note: I've seen quite a few answers on the Web that tell me to perform an > "Action". That's not a solution here. This is a 'Stateful Structured > Streaming' job. Yes, I am also 'registering' them in SparkContext. > > > >
Using Spark Accumulators with Structured Streaming
In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO! When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.
Re: No. of active states?
No. We are already capturing these metrics (e.g. numInputRows, inputRowsPerSecond). I am talking about "No. of States" in the memory at any given time. On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim wrote: > If you're referring total "entries" in all states in SS job, it's being > provided via StreamingQueryListener. > > > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries > > Hope this helps. > > On Fri, May 8, 2020 at 3:26 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Is there a way to get the total no. of active states in memory at any >> given point in a Stateful Spark Structured Streaming job? We are thinking >> of using this metric for 'Auto Scaling' our Spark cluster. >> >
Dynamically changing maxOffsetsPerTrigger
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while a Stateful Structured Streaming job is running? We are thinking of auto-scaling our Spark cluster but if we don't modify the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to the cluster help? I don't think it would, would it? In other words, if I add 2 new VMs to the cluster but value of 'maxOffsetsPerTrigger' is still the same would performance improve? I would think not. We would have to explicitly stop the job, add VMs & then restart the job after changing the value of 'maxOffsetsPerTrigger' - which defeats the purpose of Auto-scaling. Please tell me if my understanding is not correct. Thanks.
No. of active states?
Is there a way to get the total no. of active states in memory at any given point in a Stateful Spark Structured Streaming job? We are thinking of using this metric for 'Auto Scaling' our Spark cluster.
Spark not able to read from an Embedded Kafka Topic
I am trying to write an integration test using Embedded Kafka but I keep getting NullPointerException. My test case is very simple. It has following steps: 1. Read a JSON file & write messages to an inputTopic. 2. Perform a 'readStream' operation. 3. Do a 'select' on the Stream. This throws a NullPointerException. What am I doing wrong? Code is given below: "My Test which runs with Embedded Kafka" should "Generate correct Result" in { implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig( kafkaPort = 9066, zooKeeperPort = 2066, Map("log.dir" -> "./src/test/resources/") ) withRunningKafka { createCustomTopic(inputTopic) val source = Source.fromFile("src/test/resources/test1.json") source.getLines.toList.filterNot(_.isEmpty).foreach( line => publishStringMessageToKafka(inputTopic, line) ) source.close() implicit val deserializer: StringDeserializer = new StringDeserializer createCustomTopic(outputTopic) import spark2.implicits._ val schema = spark.read.json("my.json").schema val myStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9066") .option("subscribe", inputTopic) .load() // Schema looks good myStream.printSchema() // Following line throws NULLPointerException! Why? val df = myStream.select(from_json($"value".cast("string"), schema).alias("value")) // There's more code... but let's not worry about that for now. } }
Re: Stateful Structured Spark Streaming: Timeout is not getting triggered
Yes that was it! It seems it only works if input data is continuously flowing. I had stopped the input job because I had enough data but it seems timeouts work only if the data is continuously fed. Not sure why it's designed that way. Makes it a bit harder to write unit/integration tests BUT I am sure there's a reason why it's designed this way. Thanks. On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das wrote: > Make sure that you are continuously feeding data into the query to trigger > the batches. only then timeouts are processed. > See the timeout behavior details here - > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState > > On Wed, Mar 4, 2020 at 2:51 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I've set the timeout duration to "2 minutes" as follows: >> >> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: >> Iterator[R00tJsonObject], >> oldState: GroupState[MyState]): OutputRow = { >> >> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + >> tuple3._2 + ", " + tuple3._3) >> var state: MyState = if (oldState.exists) oldState.get else >> MyState(tuple3._1, tuple3._2, tuple3._3) >> >> if (oldState.hasTimedOut) { >> println("@ oldState has timed out ") >> // Logic to Write OutputRow >> OutputRow("some values here...") >> } else { >> for (input <- inputs) { >> state = updateWithEvent(state, input) >> oldState.update(state) >> *oldState.setTimeoutDuration("2 minutes")* >> } >> OutputRow(null, null, null) >> } >> >> } >> >> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as >> follows... >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) >> >> But 'hasTimedOut' is never true so I don't get any output! What am I doing >> wrong? >> >> >> >>
Stateful Structured Spark Streaming: Timeout is not getting triggered
I've set the timeout duration to "2 minutes" as follows: def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject], oldState: GroupState[MyState]): OutputRow = { println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3) var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3) if (oldState.hasTimedOut) { println("@ oldState has timed out ") // Logic to Write OutputRow OutputRow("some values here...") } else { for (input <- inputs) { state = updateWithEvent(state, input) oldState.update(state) *oldState.setTimeoutDuration("2 minutes")* } OutputRow(null, null, null) } } I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows... .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?
Re: Stateful Spark Streaming: Required attribute 'value' not found
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe it will help someone. On Tue, Mar 3, 2020 at 6:02 PM Something Something wrote: > In a Stateful Spark Streaming application I am writing the 'OutputRow' in > the 'updateAcrossEvents' but I keep getting this error (*Required > attribute 'value' not found*) while it's trying to write to Kafka. I know > from the documentation that 'value' attribute needs to be set but how do I > do that in the 'Stateful Structured Streaming'? Where & how do I add this > 'value' attribute in the following code? *Note: I am using Spark 2.3.1* > > withEventTime > .as[R00tJsonObject] > .withWatermark("event_time", "5 minutes") > .groupByKey(row => (row.value.Id, row.value.time.toString, > row.value.cId)) > > .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents) > .writeStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("topic", "myTopic") > .option("checkpointLocation", "/Users/username/checkpointLocation") > .outputMode("update") > .start() > .awaitTermination() > >
Stateful Spark Streaming: Required attribute 'value' not found
In a Stateful Spark Streaming application I am writing the 'OutputRow' in the 'updateAcrossEvents' but I keep getting this error (*Required attribute 'value' not found*) while it's trying to write to Kafka. I know from the documentation that 'value' attribute needs to be set but how do I do that in the 'Stateful Structured Streaming'? Where & how do I add this 'value' attribute in the following code? *Note: I am using Spark 2.3.1* withEventTime .as[R00tJsonObject] .withWatermark("event_time", "5 minutes") .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId)) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "myTopic") .option("checkpointLocation", "/Users/username/checkpointLocation") .outputMode("update") .start() .awaitTermination()
Example of Stateful Spark Structured Streaming with Kafka
There are lots of examples on 'Stateful Structured Streaming' in 'The Definitive Guide' book BUT all of them read JSON from a 'path'. That's working for me. Now I need to read from Kafka. I Googled but I couldn't find any example. I am struggling to Map the 'Value' of the Kafka message to my JSON. Any help would be appreciated. Here's what I am trying: val query = withEventTime .as[R00tJsonObject] .withWatermark("event_time", "5 minutes") .groupByKey(row => (row.report.id, row.report.time.toString, row.report.cId)) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", *"myTopic"*) .option("checkpointLocation", "/Users/username/checkpointLocation") .outputMode("update") .start().awaitTermination cannot resolve 'arrivalTime' given input columns: [value, event_time];
Re: Spark Streaming with mapGroupsWithState
I changed it to Tuple2 and that problem is solved. Any thoughts on this message *Unapplied methods are only converted to functions when a function type is expected.* *You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`. .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* On Mon, Mar 2, 2020 at 5:12 PM lec ssmi wrote: > maybe you can combine the fields you want to use into one field > > Something Something 于2020年3月3日周二 上午6:37写道: > >> I am writing a Stateful Streaming application in which I am using >> mapGroupsWithState to create aggregates for Groups but I need to create >> *Groups >> based on more than one column in the Input Row*. All the examples in the >> 'Spark: The Definitive Guide' use only one column such as 'User' or >> 'Device'. I am using code similar to what's given below. *How do I >> specify more than one field in the 'groupByKey'?* >> >> There are other challenges as well. The book says we can use >> 'updateAcrossEvents' the way given below but I get compile time error >> saying: >> >> >> *Error:(43, 65) missing argument list for method updateAcrossEvents in >> object MainUnapplied methods are only converted to functions when a >> function type is expected.You can make this conversion explicit by writing >> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of >> `updateAcrossEvents`. >> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* >> >> Another challenge: Compiler also complains about the my *MyReport*: >> *Error:(41, >> 12) Unable to find encoder for type stored in a Dataset. Primitive types >> (Int, String, etc) and Product types (case classes) are supported by >> importing spark.implicits._ Support for serializing other types will be >> added in future releases.* >> >> Help in resolving these errors would be greatly appreciated. Thanks in >> advance. >> >> >> withEventTime >> .as[MyReport] >> .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2? >> >> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents) >> .writeStream >> .queryName("test_query") >> .format("memory") >> .outputMode("update") >> .start() >> >>
Spark Streaming with mapGroupsWithState
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create *Groups based on more than one column in the Input Row*. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. *How do I specify more than one field in the 'groupByKey'?* There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: *Error:(43, 65) missing argument list for method updateAcrossEvents in object MainUnapplied methods are only converted to functions when a function type is expected.You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`. .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* Another challenge: Compiler also complains about the my *MyReport*: *Error:(41, 12) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.* Help in resolving these errors would be greatly appreciated. Thanks in advance. withEventTime .as[MyReport] .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2? .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents) .writeStream .queryName("test_query") .format("memory") .outputMode("update") .start()
Aggregating values by a key field in Spark Streaming
Here's my use case: Messages are coming into a Kafka Topic for different 'Events'. Each event has a unique Event Id. I need to aggregate counts for each Event AFTER the event is completed. For now, we are thinking we can assume an event is completed if there are no more messages coming in for a period of X minutes. How do I do this using Spark Streaming? I am reading up on 'Stateful Transformations' with 'mapWithState'. Is that the right approach? Any sample code would be even more appreciated. Thanks.
Spark Streaming: Aggregating values across batches
We've a Spark Streaming job that calculates some values in each batch. What we need to do now is aggregate values across ALL batches. What is the best strategy to do this in Spark Streaming. Should we use 'Spark Accumulators' for this?
Creating Custom Receiver for Spark Streaming
Is it safe to assume that Spark will always create a single instance of Custom Receiver? Or would it create multiple instances on each node in a cluster? Wondering if I need to worry about receiving the same message on different nodes etc. Please help. Thanks.
Storing object in spark streaming
In my custom receiver for Spark Streaming I've code such as this: messages.toArray().foreach(msg => { val m = msg.asInstanceOf[Message] * store(m.getBody)* }) Instead of 'body' which is of type 'String', I would rather pass the entire Message object, but when I say store(m), I get a compiler error saying: "Cannot resolve reference store with such signature" But I see this method in 'Receiver.scala': def store(dataItem: T) { executor.pushSingle(dataItem) } How do I store the entire object? Please help. Thanks.
JavaKafkaWordCount not working under Spark Streaming
I am embarrassed to admit but I can't get a basic 'word count' to work under Kafka/Spark streaming. My code looks like this. I don't see any word counts in console output. Also, don't see any output in UI. Needless to say, I am newbie in both 'Spark' as well as 'Kafka'. Please help. Thanks. Here's the code: public static void main(String[] args) { if (args.length 4) { System.err.println(Usage: JavaKafkaWordCount zkQuorum group topics numThreads); System.exit(1); } //StreamingExamples.setStreamingLogLevels(); //SparkConf sparkConf = new SparkConf().setAppName(JavaKafkaWordCount); // Location of the Spark directory String sparkHome = /opt/mapr/spark/spark-1.0.2/; // URL of the Spark cluster String sparkUrl = spark://mymachine:7077; // Location of the required JAR files String jarFiles = ./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(JavaKafkaWordCount); sparkConf.setJars(new String[]{jarFiles}); sparkConf.setMaster(sparkUrl); sparkConf.set(spark.ui.port, 2348); sparkConf.setSparkHome(sparkHome); MapString, String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect, myedgenode:2181); kafkaParams.put(group.id, 1); kafkaParams.put(metadata.broker.list, myedgenode:9092); kafkaParams.put(serializer.class, kafka.serializer.StringEncoder); kafkaParams.put(request.required.acks, 1); // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); MapString, Integer topicMap = new HashMapString, Integer(); String[] topics = args[2].split(,); for (String topic: topics) { topicMap.put(topic, numThreads); } //JavaPairReceiverInputDStreamString, String messages = //KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaPairDStreamString, String messages = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER()); JavaDStreamString lines = messages.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).reduceByKey(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination();
Re: JavaKafkaWordCount not working under Spark Streaming
I am not running locally. The Spark master is: spark://machine name:7077 On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: What is the Spark master that you are using. Use local[4], not local if you are running locally. On Mon, Nov 10, 2014 at 3:01 PM, Something Something mailinglist...@gmail.com wrote: I am embarrassed to admit but I can't get a basic 'word count' to work under Kafka/Spark streaming. My code looks like this. I don't see any word counts in console output. Also, don't see any output in UI. Needless to say, I am newbie in both 'Spark' as well as 'Kafka'. Please help. Thanks. Here's the code: public static void main(String[] args) { if (args.length 4) { System.err.println(Usage: JavaKafkaWordCount zkQuorum group topics numThreads); System.exit(1); } //StreamingExamples.setStreamingLogLevels(); //SparkConf sparkConf = new SparkConf().setAppName(JavaKafkaWordCount); // Location of the Spark directory String sparkHome = /opt/mapr/spark/spark-1.0.2/; // URL of the Spark cluster String sparkUrl = spark://mymachine:7077; // Location of the required JAR files String jarFiles = ./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(JavaKafkaWordCount); sparkConf.setJars(new String[]{jarFiles}); sparkConf.setMaster(sparkUrl); sparkConf.set(spark.ui.port, 2348); sparkConf.setSparkHome(sparkHome); MapString, String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect, myedgenode:2181); kafkaParams.put(group.id, 1); kafkaParams.put(metadata.broker.list, myedgenode:9092); kafkaParams.put(serializer.class, kafka.serializer.StringEncoder); kafkaParams.put(request.required.acks, 1); // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); MapString, Integer topicMap = new HashMapString, Integer(); String[] topics = args[2].split(,); for (String topic: topics) { topicMap.put(topic, numThreads); } //JavaPairReceiverInputDStreamString, String messages = //KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaPairDStreamString, String messages = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER()); JavaDStreamString lines = messages.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).reduceByKey(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination();
Re: Kafka Consumer in Spark Streaming
As suggested by Qiaou, looked at the UI: 1) Under 'Stages' the only 'active' stage is: runJob at ReceiverTracker.scala:275 2) Under 'Executors', there's only 1 active task, but I don't see any output (or logs) 3) Under 'Streaming', there's one receiver called, 'KafkaReciever-0', but 'Records in last batch' are 0. Honestly, I think it's not connecting to my Kafka topic - possibly because I need to pass the following parameter: metadata.broker.list - machine:9092 But I don't know how to pass this to KafkaUtils.createStream(...). Could that be the problem? On Tue, Nov 4, 2014 at 11:12 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Your code doesn't trigger any action. How about the following? JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } ); statuses.print() ; Or you could use foreachRDD instead of map() if your intention is just printing. Thanks Best Regards On Wed, Nov 5, 2014 at 12:35 PM, Something Something mailinglist...@gmail.com wrote: It's not local. My spark url is something like this: String sparkUrl = spark://host name:7077; On Tue, Nov 4, 2014 at 11:03 PM, Jain Rahul ja...@ivycomptech.com wrote: I think you are running it locally. Do you have local[1] here for master url? If yes change it to local[2] or more number of threads. It may be due to topic name mismatch also. sparkConf.setMaster(“local[1]); Regards, Rahul From: Something Something mailinglist...@gmail.com Date: Wednesday, November 5, 2014 at 12:23 PM To: Shao, Saisai saisai.s...@intel.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Kafka Consumer in Spark Streaming Added foreach as follows. Still don't see any output on my console. Would this go to the worker logs as Jerry indicated? JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, mymachine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { return status; } } ); statuses.foreach(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { for (String str: stringJavaRDD.take(10)) { System.out.println(Message: + str); } return null; } }); On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com wrote: If you’re running on a standalone mode, the log is under SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check the document of Spark to see the details. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 2:28 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org *Subject:* Re: Kafka Consumer in Spark Streaming The Kafka broker definitely has messages coming in. But your #2 point is valid. Needless to say I am a newbie to Spark. I can't figure out where the 'executor' logs would be. How would I find them? All I see printed on my screen is this: 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started 14/11/04 22:21:23 INFO Remoting: Starting remoting 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@mymachie:60743] 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@mymachine:60743] 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to shell based --- Time: 141516852 ms --- --- Time: 141516852 ms --- Keeps repeating this... On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app
Kafka Consumer in Spark Streaming
I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } );
Re: Kafka Consumer in Spark Streaming
The Kafka broker definitely has messages coming in. But your #2 point is valid. Needless to say I am a newbie to Spark. I can't figure out where the 'executor' logs would be. How would I find them? All I see printed on my screen is this: 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started 14/11/04 22:21:23 INFO Remoting: Starting remoting 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@mymachie:60743] 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@mymachine:60743] 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to shell based --- Time: 141516852 ms --- --- Time: 141516852 ms --- Keeps repeating this... On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app, this will help to define the problem. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 1:57 PM *To:* user@spark.apache.org *Subject:* Kafka Consumer in Spark Streaming I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } );
Re: Kafka Consumer in Spark Streaming
Added foreach as follows. Still don't see any output on my console. Would this go to the worker logs as Jerry indicated? JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, mymachine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { return status; } } ); statuses.foreach(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { for (String str: stringJavaRDD.take(10)) { System.out.println(Message: + str); } return null; } }); On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com wrote: If you’re running on a standalone mode, the log is under SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check the document of Spark to see the details. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 2:28 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org *Subject:* Re: Kafka Consumer in Spark Streaming The Kafka broker definitely has messages coming in. But your #2 point is valid. Needless to say I am a newbie to Spark. I can't figure out where the 'executor' logs would be. How would I find them? All I see printed on my screen is this: 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started 14/11/04 22:21:23 INFO Remoting: Starting remoting 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@mymachie:60743] 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@mymachine:60743] 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to shell based --- Time: 141516852 ms --- --- Time: 141516852 ms --- Keeps repeating this... On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app, this will help to define the problem. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 1:57 PM *To:* user@spark.apache.org *Subject:* Kafka Consumer in Spark Streaming I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } );
Re: Kafka Consumer in Spark Streaming
It's not local. My spark url is something like this: String sparkUrl = spark://host name:7077; On Tue, Nov 4, 2014 at 11:03 PM, Jain Rahul ja...@ivycomptech.com wrote: I think you are running it locally. Do you have local[1] here for master url? If yes change it to local[2] or more number of threads. It may be due to topic name mismatch also. sparkConf.setMaster(“local[1]); Regards, Rahul From: Something Something mailinglist...@gmail.com Date: Wednesday, November 5, 2014 at 12:23 PM To: Shao, Saisai saisai.s...@intel.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Kafka Consumer in Spark Streaming Added foreach as follows. Still don't see any output on my console. Would this go to the worker logs as Jerry indicated? JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, mymachine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { return status; } } ); statuses.foreach(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { for (String str: stringJavaRDD.take(10)) { System.out.println(Message: + str); } return null; } }); On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com wrote: If you’re running on a standalone mode, the log is under SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check the document of Spark to see the details. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 2:28 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org *Subject:* Re: Kafka Consumer in Spark Streaming The Kafka broker definitely has messages coming in. But your #2 point is valid. Needless to say I am a newbie to Spark. I can't figure out where the 'executor' logs would be. How would I find them? All I see printed on my screen is this: 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started 14/11/04 22:21:23 INFO Remoting: Starting remoting 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@mymachie:60743] 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@mymachine:60743] 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to shell based --- Time: 141516852 ms --- --- Time: 141516852 ms --- Keeps repeating this... On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app, this will help to define the problem. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 1:57 PM *To:* user@spark.apache.org *Subject:* Kafka Consumer in Spark Streaming I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } ); This email and any attachments are confidential, and may be legally privileged and protected by copyright. If you are not the intended recipient dissemination or copying of this email is prohibited. If you have received this in error, please notify the sender by replying by email and then delete the email completely from your system. Any views or opinions are solely those of the sender. This communication is not intended to form a binding contract unless expressly