Re: FW: Kafka Direct Stream - dynamic topic subscription
As it says in SPARK-10320 and in the docs at http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#consumerstrategies , you can use SubscribePattern On Sun, Oct 29, 2017 at 3:56 PM, Ramanan, Buvana (Nokia - US/Murray Hill) <buvana.rama...@nokia-bell-labs.com> wrote: > Hello Cody, > > > > As the stake holders of JIRA SPARK-10320 issue, can you please explain the > purpose of dynamic topic subscription? Does it mean adapting the consumer to > read from the new partitions that might get created after the SparkStreaming > job begins? Is there a succinct writeup on the dynamic topic subscription > feature that you can share? > > > > Also, is there a way I can subscribe to topics whose name matches a regular > expression (some Kafka consumers such as kafka-python python library support > that)? > > > > I forward the email I sent to spark users group that contains a little more > background on my question. > > > > Thank you, > > Regards, > > Buvana > > > > From: Ramanan, Buvana (Nokia - US/Murray Hill) > [mailto:buvana.rama...@nokia-bell-labs.com] > Sent: Friday, October 27, 2017 10:46 PM > To: user@spark.apache.org > Subject: Kafka Direct Stream - dynamic topic subscription > > > > Hello, > > > > Using Spark 2.2.0. Interested in seeing the action of dynamic topic > subscription. > > > > Tried this example: streaming.DirectKafkaWordCount (which uses > org.apache.spark.streaming.kafka010) > > > > I start with 8 Kafka partitions in my topic and found that Spark Streaming > executes 8 tasks (one per partition), which is what is expected. While this > example process was going on, I increased the Kafka partitions to 16 and > started producing data to the new partitions as well. > > > > I expected that the Kafka consumer that Spark uses, would detect this change > and spawn new tasks for the new partitions. But I find that it only reads > from the old partitions and does not read from new partitions. When I do a > restart, it reads from all 16 partitions. > > > > Is this expected? > > > > What is meant by dynamic topic subscription? > > > > Does it apply only to topics with a name that matches a regular expression > and it does not apply to dynamically growing partitions? > > > > Thanks, > > Buvana > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Kafka Direct Stream - dynamic topic subscription
Hello, Using Spark 2.2.0. Interested in seeing the action of dynamic topic subscription. Tried this example: streaming.DirectKafkaWordCount (which uses org.apache.spark.streaming.kafka010) I start with 8 Kafka partitions in my topic and found that Spark Streaming executes 8 tasks (one per partition), which is what is expected. While this example process was going on, I increased the Kafka partitions to 16 and started producing data to the new partitions as well. I expected that the Kafka consumer that Spark uses, would detect this change and spawn new tasks for the new partitions. But I find that it only reads from the old partitions and does not read from new partitions. When I do a restart, it reads from all 16 partitions. Is this expected? What is meant by dynamic topic subscription? Does it apply only to topics with a name that matches a regular expression and it does not apply to dynamically growing partitions? Thanks, Buvana
Re: ConcurrentModificationException using Kafka Direct Stream
Thanks Cody, It worked for me buy keeping num executor with each having 1 core = num of partitions of kafka. On Mon, Sep 18, 2017 at 8:47 PM Cody Koeningerwrote: > Have you searched in jira, e.g. > > https://issues.apache.org/jira/browse/SPARK-19185 > > On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR > wrote: > > Hi > > > > Changing spark version if my last resort, is there any other workaround > for > > this problem. > > > > > > On Mon, Sep 18, 2017 at 11:43 AM pandees waran > wrote: > >> > >> All, May I know what exactly changed in 2.1.1 which solved this problem? > >> > >> Sent from my iPhone > >> > >> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias > >> wrote: > >> > >> Hi, > >> > >> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 > >> solved my issue. Can you try with 2.1.1 as well and report back? > >> > >> Best, > >> Anastasios > >> > >> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" : > >> > >> > >> Hi > >> > >> I am using spark 2.1.0 with scala 2.11.8, and while iterating over the > >> partitions of each rdd in a dStream formed using KafkaUtils, i am > getting > >> the below exception, please suggest a fix. > >> > >> I have following config > >> > >> kafka : > >> enable.auto.commit:"true", > >> auto.commit.interval.ms:"1000", > >> session.timeout.ms:"3", > >> > >> Spark: > >> > >> spark.streaming.backpressure.enabled=true > >> > >> spark.streaming.kafka.maxRatePerPartition=200 > >> > >> > >> Exception in task 0.2 in stage 3236.0 (TID 77795) > >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for > >> multi-threaded access > >> > >> -- > >> Kind Regards > >> Harsh > >> > >> > > >
Re: ConcurrentModificationException using Kafka Direct Stream
Have you searched in jira, e.g. https://issues.apache.org/jira/browse/SPARK-19185 On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKARwrote: > Hi > > Changing spark version if my last resort, is there any other workaround for > this problem. > > > On Mon, Sep 18, 2017 at 11:43 AM pandees waran wrote: >> >> All, May I know what exactly changed in 2.1.1 which solved this problem? >> >> Sent from my iPhone >> >> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias >> wrote: >> >> Hi, >> >> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 >> solved my issue. Can you try with 2.1.1 as well and report back? >> >> Best, >> Anastasios >> >> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" : >> >> >> Hi >> >> I am using spark 2.1.0 with scala 2.11.8, and while iterating over the >> partitions of each rdd in a dStream formed using KafkaUtils, i am getting >> the below exception, please suggest a fix. >> >> I have following config >> >> kafka : >> enable.auto.commit:"true", >> auto.commit.interval.ms:"1000", >> session.timeout.ms:"3", >> >> Spark: >> >> spark.streaming.backpressure.enabled=true >> >> spark.streaming.kafka.maxRatePerPartition=200 >> >> >> Exception in task 0.2 in stage 3236.0 (TID 77795) >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for >> multi-threaded access >> >> -- >> Kind Regards >> Harsh >> >> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: ConcurrentModificationException using Kafka Direct Stream
Hi Changing spark version if my last resort, is there any other workaround for this problem. On Mon, Sep 18, 2017 at 11:43 AM pandees waranwrote: > All, May I know what exactly changed in 2.1.1 which solved this problem? > > Sent from my iPhone > > On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias > wrote: > > Hi, > > I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 > solved my issue. Can you try with 2.1.1 as well and report back? > > Best, > Anastasios > > Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" : > > > Hi > > I am using spark 2.1.0 with scala 2.11.8, and while iterating over the > partitions of each rdd in a dStream formed using KafkaUtils, i am getting > the below exception, please suggest a fix. > > I have following config > > kafka : > enable.auto.commit:"true", > auto.commit.interval.ms:"1000", > session.timeout.ms:"3", > > Spark: > > spark.streaming.backpressure.enabled=true > > spark.streaming.kafka.maxRatePerPartition=200 > > > Exception in task 0.2 in stage 3236.0 (TID 77795) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > > -- > Kind Regards > Harsh > > >
Re: ConcurrentModificationException using Kafka Direct Stream
All, May I know what exactly changed in 2.1.1 which solved this problem? Sent from my iPhone > On Sep 17, 2017, at 11:08 PM, Anastasios Zouziaswrote: > > Hi, > > I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 > solved my issue. Can you try with 2.1.1 as well and report back? > > Best, > Anastasios > > Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" : > > Hi > > I am using spark 2.1.0 with scala 2.11.8, and while iterating over the > partitions of each rdd in a dStream formed using KafkaUtils, i am getting the > below exception, please suggest a fix. > > I have following config > > kafka : > enable.auto.commit:"true", > auto.commit.interval.ms:"1000", > session.timeout.ms:"3", > > Spark: > spark.streaming.backpressure.enabled=true > > spark.streaming.kafka.maxRatePerPartition=200 > > > > Exception in task 0.2 in stage 3236.0 (TID 77795) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > > -- > Kind Regards > Harsh >
Re: ConcurrentModificationException using Kafka Direct Stream
Hi, I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 solved my issue. Can you try with 2.1.1 as well and report back? Best, Anastasios Am 17.09.2017 16:48 schrieb "HARSH TAKKAR": Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating over the partitions of each rdd in a dStream formed using KafkaUtils, i am getting the below exception, please suggest a fix. I have following config kafka : enable.auto.commit:"true", auto.commit.interval.ms:"1000", session.timeout.ms:"3", Spark: spark.streaming.backpressure.enabled=true spark.streaming.kafka.maxRatePerPartition=200 Exception in task 0.2 in stage 3236.0 (TID 77795) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access -- Kind Regards Harsh
Re: ConcurrentModificationException using Kafka Direct Stream
You should paste some code. ConcurrentModificationException normally happens when you modify a list or any non-thread safe data structure while you are iterating over it. On Sun, Sep 17, 2017 at 10:25 PM, HARSH TAKKARwrote: > Hi, > > No we are not creating any thread for kafka DStream > however, we have a single thread for refreshing a resource cache on > driver, but that is totally separate to this connection. > > On Mon, Sep 18, 2017 at 12:29 AM kant kodali wrote: > >> Are you creating threads in your application? >> >> On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR >> wrote: >> >>> >>> Hi >>> >>> I am using spark 2.1.0 with scala 2.11.8, and while iterating over the >>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting >>> the below exception, please suggest a fix. >>> >>> I have following config >>> >>> kafka : >>> enable.auto.commit:"true", >>> auto.commit.interval.ms:"1000", >>> session.timeout.ms:"3", >>> >>> Spark: >>> >>> spark.streaming.backpressure.enabled=true >>> >>> spark.streaming.kafka.maxRatePerPartition=200 >>> >>> >>> Exception in task 0.2 in stage 3236.0 (TID 77795) >>> java.util.ConcurrentModificationException: KafkaConsumer is not safe >>> for multi-threaded access >>> >>> -- >>> Kind Regards >>> Harsh >>> >> >>
Re: ConcurrentModificationException using Kafka Direct Stream
Hi, No we are not creating any thread for kafka DStream however, we have a single thread for refreshing a resource cache on driver, but that is totally separate to this connection. On Mon, Sep 18, 2017 at 12:29 AM kant kodaliwrote: > Are you creating threads in your application? > > On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR > wrote: > >> >> Hi >> >> I am using spark 2.1.0 with scala 2.11.8, and while iterating over the >> partitions of each rdd in a dStream formed using KafkaUtils, i am getting >> the below exception, please suggest a fix. >> >> I have following config >> >> kafka : >> enable.auto.commit:"true", >> auto.commit.interval.ms:"1000", >> session.timeout.ms:"3", >> >> Spark: >> >> spark.streaming.backpressure.enabled=true >> >> spark.streaming.kafka.maxRatePerPartition=200 >> >> >> Exception in task 0.2 in stage 3236.0 (TID 77795) >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for >> multi-threaded access >> >> -- >> Kind Regards >> Harsh >> > >
Re: ConcurrentModificationException using Kafka Direct Stream
Are you creating threads in your application? On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKARwrote: > > Hi > > I am using spark 2.1.0 with scala 2.11.8, and while iterating over the > partitions of each rdd in a dStream formed using KafkaUtils, i am getting > the below exception, please suggest a fix. > > I have following config > > kafka : > enable.auto.commit:"true", > auto.commit.interval.ms:"1000", > session.timeout.ms:"3", > > Spark: > > spark.streaming.backpressure.enabled=true > > spark.streaming.kafka.maxRatePerPartition=200 > > > Exception in task 0.2 in stage 3236.0 (TID 77795) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > > -- > Kind Regards > Harsh >
ConcurrentModificationException using Kafka Direct Stream
Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating over the partitions of each rdd in a dStream formed using KafkaUtils, i am getting the below exception, please suggest a fix. I have following config kafka : enable.auto.commit:"true", auto.commit.interval.ms:"1000", session.timeout.ms:"3", Spark: spark.streaming.backpressure.enabled=true spark.streaming.kafka.maxRatePerPartition=200 Exception in task 0.2 in stage 3236.0 (TID 77795) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access -- Kind Regards Harsh
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
In either case, end to end exactly once guarantee can only be ensured only if the output sink is updated transactionally. The engine has to re execute data on failure. Exactly once guarantee means that the external storage is updated as if each data record was computed exactly once. That's why you need to update them transactionally to handle possible recomputations. This is true for both spark streaming and structured streaming. Hope this helps. On Jun 6, 2017 5:56 AM, "ALunar Beach" <alunarbe...@gmail.com> wrote: > Thanks TD. > In pre-structured streaming, exactly once guarantee on input is not > guaranteed. is it? > > On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> This is the expected behavior. There are some confusing corner cases. >> If you are starting to play with Spark Streaming, i highly recommend >> learning Structured Streaming >> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> >> instead. >> >> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> >> wrote: >> >>> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >>> It uses a 30 sec batch duration and normally the job is successful in >>> 15-20 sec. >>> >>> If the spark application fails after the successful completion >>> (149668428ms in the log below) and restarts, it's duplicating the last >>> batch again. >>> >>> Is this the expected behavior? I was expecting this to start a new batch >>> window. >>> >>> >>> Here are some logs: >>> >>> Last successful run: >>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time >>> 149668428 ms (execution: 0.029 s) >>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list >>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 >>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time >>> 149668428 ms to writer queue >>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time >>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide >>> aProjects/Spark2Example/ckpt/checkpoint-149668428' >>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time >>> 149668428 ms saved to file >>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', >>> took 4032 bytes and 9 ms* >>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time >>> 149668428 ms >>> >>> After the restart, >>> >>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct >>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time >>> 149668428 ms [(my_test,0,2000,2000)] >>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data >>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 >>> batches): 149668428 ms, 149668431 ms, 149668434 ms, >>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >>> 149668449 ms, 149668452 ms, 149668455 ms* >>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 >>> batches): * >>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 >>> batches): *149668428 ms, 149668431 ms, 149668434 ms, >>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >>> 149668449 ms, 149668452 ms, 149668455 ms >>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms >>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job >>> 149668428 ms.0 from job set of time 149668428 ms >>> >>> >>> >>> -- >>> View this message in context: Fwd: Spark Streaming Checkpoint and >>> Exactly Once Guarantee on Kafka Direct Stream >>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> >>> Sent from the Apache Spark User List mailing list archive >>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >>> >> >> >
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
Thanks TD. In pre-structured streaming, exactly once guarantee on input is not guaranteed. is it? On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > This is the expected behavior. There are some confusing corner cases. > If you are starting to play with Spark Streaming, i highly recommend > learning Structured Streaming > <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> > instead. > > On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> > wrote: > >> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >> It uses a 30 sec batch duration and normally the job is successful in >> 15-20 sec. >> >> If the spark application fails after the successful completion >> (149668428ms in the log below) and restarts, it's duplicating the last >> batch again. >> >> Is this the expected behavior? I was expecting this to start a new batch >> window. >> >> >> Here are some logs: >> >> Last successful run: >> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time >> 149668428 ms (execution: 0.029 s) >> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list >> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 >> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time >> 149668428 ms to writer queue >> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time >> 149668428 ms to file 'file:/Users/anbucheeralan/Ide >> aProjects/Spark2Example/ckpt/checkpoint-149668428' >> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time >> 149668428 ms saved to file >> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', >> took 4032 bytes and 9 ms* >> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time >> 149668428 ms >> >> After the restart, >> >> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct >> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time >> 149668428 ms [(my_test,0,2000,2000)] >> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data >> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 >> batches): 149668428 ms, 149668431 ms, 149668434 ms, >> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >> 149668449 ms, 149668452 ms, 149668455 ms* >> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 >> batches): * >> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): >> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, >> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, >> 1496684520000 ms, 149668455 ms >> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms >> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job >> 149668428 ms.0 from job set of time 149668428 ms >> >> >> >> -- >> View this message in context: Fwd: Spark Streaming Checkpoint and >> Exactly Once Guarantee on Kafka Direct Stream >> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> >> Sent from the Apache Spark User List mailing list archive >> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >> > >
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
This is the expected behavior. There are some confusing corner cases. If you are starting to play with Spark Streaming, i highly recommend learning Structured Streaming <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> instead. On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> wrote: > I am using Spark Streaming Checkpoint and Kafka Direct Stream. > It uses a 30 sec batch duration and normally the job is successful in > 15-20 sec. > > If the spark application fails after the successful completion > (149668428ms in the log below) and restarts, it's duplicating the last > batch again. > > Is this the expected behavior? I was expecting this to start a new batch > window. > > > Here are some logs: > > Last successful run: > 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time > 149668428 ms (execution: 0.029 s) > 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list > 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 > 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time > 149668428 ms to writer queue > 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time > 149668428 ms to file 'file:/Users/anbucheeralan/Ide > aProjects/Spark2Example/ckpt/checkpoint-149668428' > 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time > 149668428 ms saved to file > 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', > took 4032 bytes and 9 ms* > 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time > 149668428 ms > > After the restart, > > 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct > KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time > 149668428 ms [(my_test,0,2000,2000)] > 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data > *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 > batches): 149668428 ms, 149668431 ms, 149668434 ms, > 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, > 149668449 ms, 149668452 ms, 149668455 ms* > *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 > batches): * > *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): > *149668428 > ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, > 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, > 149668455 ms > 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms > 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job > 149668428 ms.0 from job set of time 149668428 ms > > > > -- > View this message in context: Fwd: Spark Streaming Checkpoint and Exactly > Once Guarantee on Kafka Direct Stream > <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again. Is this the expected behavior? I was expecting this to start a new batch window. Here are some logs: Last successful run: 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 149668428 ms (execution: 0.029 s) 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 149668428 ms to writer queue 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 149668428 ms to file 'file:/Users/anbucheeralan/ IdeaProjects/Spark2Example/ckpt/checkpoint-149668428' 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', took 4032 bytes and 9 ms* 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 149668428 ms After the restart, 17/06/05 13:42:31 INFO DirectKafkaInputDStream$ DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)] 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms* *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): * *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 149668428 ms.0 from job set of time 149668428 ms -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again. Is this the expected behavior? I was expecting this to start a new batch window. Here are some logs: Last successful run: 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 149668428 ms (execution: 0.029 s) 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 149668428 ms to writer queue 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 149668428 ms to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428' 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', took 4032 bytes and 9 ms* 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 149668428 ms After the restart, 17/06/05 13:42:31 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)] 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms* *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): * *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 149668428 ms.0 from job set of time 149668428 ms
Re: Spark 2 Kafka Direct Stream Consumer Issue
Could any of the experts kindly advise ? On Fri, May 19, 2017 at 6:00 PM, Jayadeep J <jayade...@gmail.com> wrote: > Hi , > > I would appreciate some advice regarding an issue we are facing in > Streaming Kafka Direct Consumer. > > We have recently upgraded our application with Kafka Direct Stream to > Spark 2 (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0) > . We find abnormal delays after the application has run for a couple of > hours & completed consumption of a ~ 10 million records. There is a sudden > dip in the processing time for ~15 seconds (usual for our app) to ~3 > minutes & from then on the processing time keeps degrading throughout > without any failure though. > > We have seen that the delay is due to certain tasks taking the exact time > duration of the configured 'request.timeout.ms' for the Kafka consumer. > We have tested this by varying timeout property to different values. Looks > like the get(offset: Long, timeout: Long): ConsumerRecord[K, V] & > subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually > timing out on some of the partitions without reading the data. But the > executor logs it as successfully completed after the exact timeout > duration. Note that most other tasks are completing successfully with > millisecond duration. We found the DEBUG logs to contain > "org.apache.kafka.common.errors.DisconnectException" without any actual > failure. The Kafka issue logged as 'KafkaConsumer susceptible to > FetchResponse starvation' [KAFKA-4753] seems to be the underlying cause. > > Could anyone kindly suggest if this a normal behaviour for > spark? Shouldn't Spark throw Timeout error or may be fail the tasks in such > cases ?? Currently the tasks seems to be successful & the job appears to > progress with really slow speed. Thanks for your help. > > Thanks > Jay >
Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)
DStream checkpoints have all kinds of other difficulties, biggest one being you can't use a checkpoint if your app code has been updated. If you can avoid checkpoints in general, I would. On Fri, Oct 21, 2016 at 11:17 AM, Erwan ALLAINwrote: > Thanks for the fast answer ! > > I just feel annoyed and frustrated not to be able to use spark checkpointing > because I believe that there mechanism has been correctly tested. > I'm afraid that reinventing the wheel can lead to side effects that I don't > see now ... > > Anyway thanks again, I know what I have to do :) > > On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger wrote: >> >> 0. If your processing time is regularly greater than your batch >> interval you're going to have problems anyway. Investigate this more, >> set maxRatePerPartition, something. >> 1. That's personally what I tend to do. >> 2. Why are you relying on checkpoints if you're storing offset state >> in the database? Just restart from the offsets in the database. I >> think your solution of map of batchtime to offset ranges would work >> fine in that case, no? (make sure to expire items from the map) >> >> >> >> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN >> wrote: >> > Hi, >> > >> > I'm currently implementing an exactly once mechanism based on the >> > following >> > example: >> > >> > >> > https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala >> > >> > the pseudo code is as follow: >> > >> > dstream.transform (store offset in a variable on driver side ) >> > dstream.map >> > dstream.foreachRdd( action + save offset in db) >> > >> > this code doesn't work if the processing time is greater than batch >> > interval >> > (same problem as windowed >> > >> > (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala) >> > >> > Indeed, at each batch interval a new rdd is created and stacked, thus >> > method >> > transform is called several times and update the global variable and at >> > last >> > when we perform saving the offset range does not correspond to the one >> > processed. >> > >> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in >> > the >> > first example) instead of dstream ? >> > >> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in >> > case >> > of crash this map will not reflect anymore the generatedRdds (restored >> > from >> > checkpoint, RDD prepared but not executed) >> > 2.1 ) Do I need to store this map elsewhere (cassandra) ? >> > 2.2) Is there a way to retrieve offset range restored ? (transform >> > method >> > is not called anymore for the checkpointed rdd) >> > 2.3) Is possible to store some context along the RDD to be serialized >> > ? >> > >> > Lots of questions, let me kow if it's not clear ! >> > > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)
Thanks for the fast answer ! I just feel annoyed and frustrated not to be able to use spark checkpointing because I believe that there mechanism has been correctly tested. I'm afraid that reinventing the wheel can lead to side effects that I don't see now ... Anyway thanks again, I know what I have to do :) On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeningerwrote: > 0. If your processing time is regularly greater than your batch > interval you're going to have problems anyway. Investigate this more, > set maxRatePerPartition, something. > 1. That's personally what I tend to do. > 2. Why are you relying on checkpoints if you're storing offset state > in the database? Just restart from the offsets in the database. I > think your solution of map of batchtime to offset ranges would work > fine in that case, no? (make sure to expire items from the map) > > > > On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN > wrote: > > Hi, > > > > I'm currently implementing an exactly once mechanism based on the > following > > example: > > > > https://github.com/koeninger/kafka-exactly-once/blob/ > master/src/main/scala/example/TransactionalPerBatch.scala > > > > the pseudo code is as follow: > > > > dstream.transform (store offset in a variable on driver side ) > > dstream.map > > dstream.foreachRdd( action + save offset in db) > > > > this code doesn't work if the processing time is greater than batch > interval > > (same problem as windowed > > (https://github.com/koeninger/kafka-exactly-once/blob/ > master/src/main/scala/example/Windowed.scala) > > > > Indeed, at each batch interval a new rdd is created and stacked, thus > method > > transform is called several times and update the global variable and at > last > > when we perform saving the offset range does not correspond to the one > > processed. > > > > 1) Do I need to work at the RDD level (inside a big forEachRDD like in > the > > first example) instead of dstream ? > > > > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in > case > > of crash this map will not reflect anymore the generatedRdds (restored > from > > checkpoint, RDD prepared but not executed) > > 2.1 ) Do I need to store this map elsewhere (cassandra) ? > > 2.2) Is there a way to retrieve offset range restored ? (transform > method > > is not called anymore for the checkpointed rdd) > > 2.3) Is possible to store some context along the RDD to be serialized ? > > > > Lots of questions, let me kow if it's not clear ! > > >
Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)
0. If your processing time is regularly greater than your batch interval you're going to have problems anyway. Investigate this more, set maxRatePerPartition, something. 1. That's personally what I tend to do. 2. Why are you relying on checkpoints if you're storing offset state in the database? Just restart from the offsets in the database. I think your solution of map of batchtime to offset ranges would work fine in that case, no? (make sure to expire items from the map) On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAINwrote: > Hi, > > I'm currently implementing an exactly once mechanism based on the following > example: > > https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala > > the pseudo code is as follow: > > dstream.transform (store offset in a variable on driver side ) > dstream.map > dstream.foreachRdd( action + save offset in db) > > this code doesn't work if the processing time is greater than batch interval > (same problem as windowed > (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala) > > Indeed, at each batch interval a new rdd is created and stacked, thus method > transform is called several times and update the global variable and at last > when we perform saving the offset range does not correspond to the one > processed. > > 1) Do I need to work at the RDD level (inside a big forEachRDD like in the > first example) instead of dstream ? > > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in case > of crash this map will not reflect anymore the generatedRdds (restored from > checkpoint, RDD prepared but not executed) > 2.1 ) Do I need to store this map elsewhere (cassandra) ? > 2.2) Is there a way to retrieve offset range restored ? (transform method > is not called anymore for the checkpointed rdd) > 2.3) Is possible to store some context along the RDD to be serialized ? > > Lots of questions, let me kow if it's not clear ! > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Kafka Direct Stream: Offset Managed Manually (Exactly Once)
Hi, I'm currently implementing an exactly once mechanism based on the following example: https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala the pseudo code is as follow: dstream.transform (store offset in a variable on driver side ) dstream.map dstream.foreachRdd( action + save offset in db) this code doesn't work if the processing time is greater than batch interval (same problem as windowed ( https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala ) Indeed, at each batch interval a new rdd is created and stacked, thus method transform is called several times and update the global variable and at last when we perform saving the offset range does not correspond to the one processed. 1) Do I need to work at the RDD level (inside a big forEachRDD like in the first example) instead of dstream ? 2) I can use a map[BatchTime, OffsetRange] as a global variable but in case of crash this map will not reflect anymore the generatedRdds (restored from checkpoint, RDD prepared but not executed) 2.1 ) Do I need to store this map elsewhere (cassandra) ? 2.2) Is there a way to retrieve offset range restored ? (transform method is not called anymore for the checkpointed rdd) 2.3) Is possible to store some context along the RDD to be serialized ? Lots of questions, let me kow if it's not clear !
Re: Use cases for kafka direct stream messageHandler
Yeah, to be clear, I'm talking about having only one constructor for a direct stream, that will give you a stream of ConsumerRecord. Different needs for topic subscription, starting offsets, etc could be handled by calling appropriate methods after construction but before starting the stream. On Wed, Mar 9, 2016 at 1:19 PM, Alan Braithwaitewrote: > I'd probably prefer to keep it the way it is, unless it's becoming more like > the function without the messageHandler argument. > > Right now I have code like this, but I wish it were more similar looking: > > if (parsed.partitions.isEmpty()) { > JavaPairInputDStream kvstream = KafkaUtils > .createDirectStream(jssc, String.class, MessageWrapper.class, > StringDecoder.class, > MessageDecoder.class, kafkaArgs(parsed), topicSet); > requests = kvstream.map((Function , > MessageWrapper>) Tuple2::_2); > } else { > requests = KafkaUtils.createDirectStream(jssc, String.class, > MessageWrapper.class, StringDecoder.class, MessageDecoder.class, > MessageWrapper.class, > kafkaArgs(parsed), parsed.partitions, > (Function , > MessageWrapper>) MessageAndMetadata::message); > } > > Of course, this is in the Java API so it may not have relevance to what > you're talking about. > > Perhaps if both functions (the one with partitions arg and the one without) > returned just ConsumerRecord, I would like that more. > > - Alan > > On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger wrote: >> >> No, looks like you'd have to catch them in the serializer and have the >> serializer return option or something. The new consumer builds a buffer full >> of records, not one at a time. >> >> On Mar 8, 2016 4:43 AM, "Marius Soutier" wrote: >>> >>> >>> > On 04.03.2016, at 22:39, Cody Koeninger wrote: >>> > >>> > The only other valid use of messageHandler that I can think of is >>> > catching serialization problems on a per-message basis. But with the >>> > new Kafka consumer library, that doesn't seem feasible anyway, and >>> > could be handled with a custom (de)serializer. >>> >>> What do you mean, that doesn't seem feasible? You mean when using a >>> custom deserializer? Right now I'm catching serialization problems in the >>> message handler, after your proposed change I'd catch them in `map()`. >>> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use cases for kafka direct stream messageHandler
I'd probably prefer to keep it the way it is, unless it's becoming more like the function without the messageHandler argument. Right now I have code like this, but I wish it were more similar looking: if (parsed.partitions.isEmpty()) { JavaPairInputDStreamkvstream = KafkaUtils .createDirectStream(jssc, String.class, MessageWrapper.class, StringDecoder.class, MessageDecoder.class, kafkaArgs(parsed), topicSet); requests = kvstream.map((Function , MessageWrapper>) Tuple2::_2); } else { requests = KafkaUtils.createDirectStream(jssc, String.class, MessageWrapper.class, StringDecoder.class, MessageDecoder.class, MessageWrapper.class, kafkaArgs(parsed), parsed.partitions, (Function , MessageWrapper>) MessageAndMetadata::message); } Of course, this is in the Java API so it may not have relevance to what you're talking about. Perhaps if both functions (the one with partitions arg and the one without) returned just ConsumerRecord, I would like that more. - Alan On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger wrote: > No, looks like you'd have to catch them in the serializer and have the > serializer return option or something. The new consumer builds a buffer > full of records, not one at a time. > On Mar 8, 2016 4:43 AM, "Marius Soutier" wrote: > >> >> > On 04.03.2016, at 22:39, Cody Koeninger wrote: >> > >> > The only other valid use of messageHandler that I can think of is >> > catching serialization problems on a per-message basis. But with the >> > new Kafka consumer library, that doesn't seem feasible anyway, and >> > could be handled with a custom (de)serializer. >> >> What do you mean, that doesn't seem feasible? You mean when using a >> custom deserializer? Right now I'm catching serialization problems in the >> message handler, after your proposed change I'd catch them in `map()`. >> >>
Re: Use cases for kafka direct stream messageHandler
No, looks like you'd have to catch them in the serializer and have the serializer return option or something. The new consumer builds a buffer full of records, not one at a time. On Mar 8, 2016 4:43 AM, "Marius Soutier"wrote: > > > On 04.03.2016, at 22:39, Cody Koeninger wrote: > > > > The only other valid use of messageHandler that I can think of is > > catching serialization problems on a per-message basis. But with the > > new Kafka consumer library, that doesn't seem feasible anyway, and > > could be handled with a custom (de)serializer. > > What do you mean, that doesn't seem feasible? You mean when using a custom > deserializer? Right now I'm catching serialization problems in the message > handler, after your proposed change I'd catch them in `map()`. > >
Re: Use cases for kafka direct stream messageHandler
> On 04.03.2016, at 22:39, Cody Koeningerwrote: > > The only other valid use of messageHandler that I can think of is > catching serialization problems on a per-message basis. But with the > new Kafka consumer library, that doesn't seem feasible anyway, and > could be handled with a custom (de)serializer. What do you mean, that doesn't seem feasible? You mean when using a custom deserializer? Right now I'm catching serialization problems in the message handler, after your proposed change I'd catch them in `map()`. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Use cases for kafka direct stream messageHandler
Wanted to survey what people are using the direct stream messageHandler for, besides just extracting key / value / offset. Would your use case still work if that argument was removed, and the stream just contained ConsumerRecord objects (http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html) which you could then use normal map transformations to access? The only other valid use of messageHandler that I can think of is catching serialization problems on a per-message basis. But with the new Kafka consumer library, that doesn't seem feasible anyway, and could be handled with a custom (de)serializer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does Spark streaming's Kafka direct stream survive from worker node failure?
Thanks a lot. 发自WPS邮箱客戶端在 Cody Koeninger,2016年2月27日 上午1:02写道:Yes.On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn wrote:Thanks a lot. And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka? 发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the driver. Have you readhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdand/or watchedhttps://www.youtube.com/watch?v=fXnNEq1v3VAOn Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen wrote:Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below:When a partition (worker node) fails in Spark Streaming,1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right?
Re: How does Spark streaming's Kafka direct stream survive from worker node failure?
Yes. On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chennwrote: > Thanks a lot. > And I got another question: What would happen if I didn't set > "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to > consume all the messages in Kafka? > > 发自WPS邮箱客戶端 > 在 Cody Koeninger ,2016年2月25日 上午11:58写道: > > The per partition offsets are part of the rdd as defined on the driver. > Have you read > > https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md > > and/or watched > > https://www.youtube.com/watch?v=fXnNEq1v3VA > > On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen > wrote: > >> Hi, as far as I know, there is a 1:1 mapping between Spark partition and >> Kafka partition, and in Spark's fault-tolerance mechanism, if a partition >> failed, another partition will be used to recompute those data. And my >> questions are below: >> >> When a partition (worker node) fails in Spark Streaming, >> 1. Is its computation passed to another partition, or just waits for the >> failed partition to restart? >> 2. How does the restarted partition know the offset range it should >> consume from Kafka? It should consume the some data as the before-failed >> one, right? >> > >
Re: How does Spark streaming's Kafka direct stream survive from worker node failure?
Thanks a lot. And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka? 发自WPS邮箱客戶端在 Cody Koeninger,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the driver. Have you readhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdand/or watchedhttps://www.youtube.com/watch?v=fXnNEq1v3VAOn Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen wrote:Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below:When a partition (worker node) fails in Spark Streaming,1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does Spark streaming's Kafka direct stream survive from worker node failure?
The per partition offsets are part of the rdd as defined on the driver. Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md and/or watched https://www.youtube.com/watch?v=fXnNEq1v3VA On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chenwrote: > Hi, as far as I know, there is a 1:1 mapping between Spark partition and > Kafka partition, and in Spark's fault-tolerance mechanism, if a partition > failed, another partition will be used to recompute those data. And my > questions are below: > > When a partition (worker node) fails in Spark Streaming, > 1. Is its computation passed to another partition, or just waits for the > failed partition to restart? > 2. How does the restarted partition know the offset range it should > consume from Kafka? It should consume the some data as the before-failed > one, right? >
How does Spark streaming's Kafka direct stream survive from worker node failure?
Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below: When a partition (worker node) fails in Spark Streaming, 1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right?
Re: Sporadic error after moving from kafka receiver to kafka direct stream
That sounds like a networking issue to me. Stuff to try - make sure every executor node can talk to every kafka broker on relevant ports - look at firewalls / network config. Even if you can make the initial connection, something may be happening after a while (we've seen ... "interesting"... issues with aws networking for instance) - look at kafka error logs - look at lsof or even tcpdump to see whats happening with the relevant ports when this occurs On Thu, Oct 22, 2015 at 9:00 AM, Conor Fennell <conorapa...@gmail.com> wrote: > Hi, > > Firstly want to say a big thanks to Cody for contributing the kafka > direct stream. > > I have been using the receiver based approach for months but the > direct stream is a much better solution for my use case. > > The job in question is now ported over to the direct stream doing > idempotent outputs to Cassandra and outputting to kafka. > I am also saving the offsets to Cassandra. > > But unfortunately I am sporadically getting the error below. > It recovers and continues but gives a large spike in the processing > delay. And it can happen in every 3 or 4 batches. > I still have other receiver jobs running and they never throw these > exceptions. > > I would be very appreciative for any direction and I can happily > provide more detail. > > Thanks, > Conor > > 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not > found, computing it > 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events, > partition 0 offsets 13630747 -> 13633001 > 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties > 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id > is overridden to > 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property > zookeeper.connect is overridden to > 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to > socket error: java.nio.channels.ClosedChannelException > 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in > stage 654.0 (TID 5242) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 15/10/22 13:31:00 INFO executor.CoarseGrainedExec
Sporadic error after moving from kafka receiver to kafka direct stream
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not found, computing it 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events, partition 0 offsets 13630747 -> 13633001 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id is overridden to 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in stage 654.0 (TID 5242) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5243 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage 654.0 (TID 5243) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sporadic error after moving from kafka receiver to kafka direct stream
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not found, computing it 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events, partition 0 offsets 13630747 -> 13633001 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id is overridden to 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in stage 654.0 (TID 5242) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5243 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage 654.0 (TID 5243) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sporadic error after moving from kafka receiver to kafka direct stream
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in stage 66.0 (TID 406) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 407
RE: Node afinity for Kafka-Direct Stream
Hi, Another point is the in the receiver based approach, all the data from kafka first goes to the Worker where the receiver runs https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md Also if you create one stream (which is the normal case), and you have many worker instances, only one worker does all the reading. Once that worker reads, the data can be “repartitioned” to distribute the load. This repartitioning is a data movement overhead in the receiver based approach. http://spark.apache.org/docs/latest/streaming-kafka-integration.html { In Receiver approach: Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers. In Direct approach: Simplified Parallelism: No need to create multiple input Kafka streams and union them. } Prajod From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: 14 October 2015 18:53 To: Saisai Shao <sai.sai.s...@gmail.com> Cc: Rishitesh Mishra <rmis...@snappydata.io>; spark users <user@spark.apache.org> Subject: Re: Node afinity for Kafka-Direct Stream Thanks Saisai, Mishra, Indeed, that hint will only work on a case where the Spark executor is co-located with the Kafka broker. I think the answer to my question as stated is that there's no warranty of where the task will execute as it will depend on the scheduler and cluster resources available (Mesos in our case). Therefore, any assumptions made about data locality using the consumer-based approach need to be reconsidered when migrating to the direct stream. ((In our case, we were using local caches to decide when a given secondary index for a record should be produced and written.)) -kr, Gerard. On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao <sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote: This preferred locality is a hint to spark to schedule Kafka tasks on the preferred nodes, if Kafka and Spark are two separate cluster, obviously this locality hint takes no effect, and spark will schedule tasks following node-local -> rack-local -> any pattern, like any other spark tasks. On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <rmis...@snappydata.io<mailto:rmis...@snappydata.io>> wrote: Hi Gerard, I am also trying to understand the same issue. Whatever code I have seen it looks like once Kafka RDD is constructed the execution of that RDD is upto the task scheduler and it can schedule the partitions based on the load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to the Kafka partitions host . So if Kafka and Spark are co hosted probably this will work. If not, I am not sure how to get data locality for a partition. Others, correct me if there is a way. On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas <gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote: In the receiver-based kafka streaming model, given that each receiver starts as a long-running task, one can rely in a certain degree of data locality based on the kafka partitioning: Data published on a given topic/partition will land on the same spark streaming receiving node until the receiver dies and needs to be restarted somewhere else. As I understand, the direct-kafka streaming model just computes offsets and relays the work to a KafkaRDD. How is the execution locality compared to the receiver-based approach? thanks, Gerard. -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: Node afinity for Kafka-Direct Stream
Assumptions about locality in spark are not very reliable, regardless of what consumer you use. Even if you have locality preferences, and locality wait turned up really high, you still have to account for losing executors. On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maaswrote: > Thanks Saisai, Mishra, > > Indeed, that hint will only work on a case where the Spark executor is > co-located with the Kafka broker. > I think the answer to my question as stated is that there's no warranty > of where the task will execute as it will depend on the scheduler and > cluster resources available (Mesos in our case). > Therefore, any assumptions made about data locality using the > consumer-based approach need to be reconsidered when migrating to the > direct stream. > > ((In our case, we were using local caches to decide when a given secondary > index for a record should be produced and written.)) > > -kr, Gerard. > > > > > On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao > wrote: > >> This preferred locality is a hint to spark to schedule Kafka tasks on the >> preferred nodes, if Kafka and Spark are two separate cluster, obviously >> this locality hint takes no effect, and spark will schedule tasks following >> node-local -> rack-local -> any pattern, like any other spark tasks. >> >> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra >> wrote: >> >>> Hi Gerard, >>> I am also trying to understand the same issue. Whatever code I have seen >>> it looks like once Kafka RDD is constructed the execution of that RDD is >>> upto the task scheduler and it can schedule the partitions based on the >>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it >>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted >>> probably this will work. If not, I am not sure how to get data locality for >>> a partition. >>> Others, >>> correct me if there is a way. >>> >>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas >>> wrote: >>> In the receiver-based kafka streaming model, given that each receiver starts as a long-running task, one can rely in a certain degree of data locality based on the kafka partitioning: Data published on a given topic/partition will land on the same spark streaming receiving node until the receiver dies and needs to be restarted somewhere else. As I understand, the direct-kafka streaming model just computes offsets and relays the work to a KafkaRDD. How is the execution locality compared to the receiver-based approach? thanks, Gerard. >>> >>> >>> >>> -- >>> >>> Regards, >>> Rishitesh Mishra, >>> SnappyData . (http://www.snappydata.io/) >>> >>> https://in.linkedin.com/in/rishiteshmishra >>> >> >> >
Node afinity for Kafka-Direct Stream
In the receiver-based kafka streaming model, given that each receiver starts as a long-running task, one can rely in a certain degree of data locality based on the kafka partitioning: Data published on a given topic/partition will land on the same spark streaming receiving node until the receiver dies and needs to be restarted somewhere else. As I understand, the direct-kafka streaming model just computes offsets and relays the work to a KafkaRDD. How is the execution locality compared to the receiver-based approach? thanks, Gerard.
Re: Node afinity for Kafka-Direct Stream
Hi Cody, I think that I misused the term 'data locality'. I think I should better call it "node affinity" instead, as this is what I would like to have: For as long as an executor is available, I would like to have the same kafka partition processed by the same node in order to take advantage of local in-memory structures. In the receiver-based mode this was a given. Any ideas how to achieve that with the direct stream approach? -greetz, Gerard. On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeningerwrote: > Assumptions about locality in spark are not very reliable, regardless of > what consumer you use. Even if you have locality preferences, and locality > wait turned up really high, you still have to account for losing executors. > > On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas > wrote: > >> Thanks Saisai, Mishra, >> >> Indeed, that hint will only work on a case where the Spark executor is >> co-located with the Kafka broker. >> I think the answer to my question as stated is that there's no warranty >> of where the task will execute as it will depend on the scheduler and >> cluster resources available (Mesos in our case). >> Therefore, any assumptions made about data locality using the >> consumer-based approach need to be reconsidered when migrating to the >> direct stream. >> >> ((In our case, we were using local caches to decide when a given >> secondary index for a record should be produced and written.)) >> >> -kr, Gerard. >> >> >> >> >> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao >> wrote: >> >>> This preferred locality is a hint to spark to schedule Kafka tasks on >>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously >>> this locality hint takes no effect, and spark will schedule tasks following >>> node-local -> rack-local -> any pattern, like any other spark tasks. >>> >>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra >> > wrote: >>> Hi Gerard, I am also trying to understand the same issue. Whatever code I have seen it looks like once Kafka RDD is constructed the execution of that RDD is upto the task scheduler and it can schedule the partitions based on the load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to the Kafka partitions host . So if Kafka and Spark are co hosted probably this will work. If not, I am not sure how to get data locality for a partition. Others, correct me if there is a way. On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas wrote: > In the receiver-based kafka streaming model, given that each receiver > starts as a long-running task, one can rely in a certain degree of data > locality based on the kafka partitioning: Data published on a given > topic/partition will land on the same spark streaming receiving node until > the receiver dies and needs to be restarted somewhere else. > > As I understand, the direct-kafka streaming model just computes > offsets and relays the work to a KafkaRDD. How is the execution locality > compared to the receiver-based approach? > > thanks, Gerard. > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra >>> >>> >> >
Re: Node afinity for Kafka-Direct Stream
What I'm saying is that it's not a given with spark, even in receiver-based mode, because as soon as you lose an executor you'll have a rebalance. Spark's model in general isn't a good fit for pinning work to specific nodes. If you really want to try and fake this, you can override getPreferredLocations and set spark.locality.wait to a high value. On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maaswrote: > Hi Cody, > > I think that I misused the term 'data locality'. I think I should better > call it "node affinity" instead, as this is what I would like to have: > For as long as an executor is available, I would like to have the same > kafka partition processed by the same node in order to take advantage of > local in-memory structures. > > In the receiver-based mode this was a given. Any ideas how to achieve that > with the direct stream approach? > > -greetz, Gerard. > > > On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger > wrote: > >> Assumptions about locality in spark are not very reliable, regardless of >> what consumer you use. Even if you have locality preferences, and locality >> wait turned up really high, you still have to account for losing executors. >> >> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas >> wrote: >> >>> Thanks Saisai, Mishra, >>> >>> Indeed, that hint will only work on a case where the Spark executor is >>> co-located with the Kafka broker. >>> I think the answer to my question as stated is that there's no warranty >>> of where the task will execute as it will depend on the scheduler and >>> cluster resources available (Mesos in our case). >>> Therefore, any assumptions made about data locality using the >>> consumer-based approach need to be reconsidered when migrating to the >>> direct stream. >>> >>> ((In our case, we were using local caches to decide when a given >>> secondary index for a record should be produced and written.)) >>> >>> -kr, Gerard. >>> >>> >>> >>> >>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao >>> wrote: >>> This preferred locality is a hint to spark to schedule Kafka tasks on the preferred nodes, if Kafka and Spark are two separate cluster, obviously this locality hint takes no effect, and spark will schedule tasks following node-local -> rack-local -> any pattern, like any other spark tasks. On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra < rmis...@snappydata.io> wrote: > Hi Gerard, > I am also trying to understand the same issue. Whatever code I have > seen it looks like once Kafka RDD is constructed the execution of that RDD > is upto the task scheduler and it can schedule the partitions based on the > load on nodes. There is preferred node specified in Kafks RDD. But ASFIK > it > maps to the Kafka partitions host . So if Kafka and Spark are co hosted > probably this will work. If not, I am not sure how to get data locality > for > a partition. > Others, > correct me if there is a way. > > On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas > wrote: > >> In the receiver-based kafka streaming model, given that each receiver >> starts as a long-running task, one can rely in a certain degree of data >> locality based on the kafka partitioning: Data published on a given >> topic/partition will land on the same spark streaming receiving node >> until >> the receiver dies and needs to be restarted somewhere else. >> >> As I understand, the direct-kafka streaming model just computes >> offsets and relays the work to a KafkaRDD. How is the execution locality >> compared to the receiver-based approach? >> >> thanks, Gerard. >> > > > > -- > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra > >>> >> >
Re: Node afinity for Kafka-Direct Stream
You could check the code of KafkaRDD, the locality (host) is got from Kafka's partition and set in KafkaRDD, this will a hint for Spark to schedule task on the preferred location. override def getPreferredLocations(thePart: Partition): Seq[String] = { val part = thePart.asInstanceOf[KafkaRDDPartition] // TODO is additional hostname resolution necessary here Seq(part.host) } On Wed, Oct 14, 2015 at 5:38 PM, Gerard Maaswrote: > In the receiver-based kafka streaming model, given that each receiver > starts as a long-running task, one can rely in a certain degree of data > locality based on the kafka partitioning: Data published on a given > topic/partition will land on the same spark streaming receiving node until > the receiver dies and needs to be restarted somewhere else. > > As I understand, the direct-kafka streaming model just computes offsets > and relays the work to a KafkaRDD. How is the execution locality compared > to the receiver-based approach? > > thanks, Gerard. >
Re: Node afinity for Kafka-Direct Stream
Hi Gerard, I am also trying to understand the same issue. Whatever code I have seen it looks like once Kafka RDD is constructed the execution of that RDD is upto the task scheduler and it can schedule the partitions based on the load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to the Kafka partitions host . So if Kafka and Spark are co hosted probably this will work. If not, I am not sure how to get data locality for a partition. Others, correct me if there is a way. On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maaswrote: > In the receiver-based kafka streaming model, given that each receiver > starts as a long-running task, one can rely in a certain degree of data > locality based on the kafka partitioning: Data published on a given > topic/partition will land on the same spark streaming receiving node until > the receiver dies and needs to be restarted somewhere else. > > As I understand, the direct-kafka streaming model just computes offsets > and relays the work to a KafkaRDD. How is the execution locality compared > to the receiver-based approach? > > thanks, Gerard. > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: Node afinity for Kafka-Direct Stream
This preferred locality is a hint to spark to schedule Kafka tasks on the preferred nodes, if Kafka and Spark are two separate cluster, obviously this locality hint takes no effect, and spark will schedule tasks following node-local -> rack-local -> any pattern, like any other spark tasks. On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishrawrote: > Hi Gerard, > I am also trying to understand the same issue. Whatever code I have seen > it looks like once Kafka RDD is constructed the execution of that RDD is > upto the task scheduler and it can schedule the partitions based on the > load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it > maps to the Kafka partitions host . So if Kafka and Spark are co hosted > probably this will work. If not, I am not sure how to get data locality for > a partition. > Others, > correct me if there is a way. > > On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas > wrote: > >> In the receiver-based kafka streaming model, given that each receiver >> starts as a long-running task, one can rely in a certain degree of data >> locality based on the kafka partitioning: Data published on a given >> topic/partition will land on the same spark streaming receiving node until >> the receiver dies and needs to be restarted somewhere else. >> >> As I understand, the direct-kafka streaming model just computes offsets >> and relays the work to a KafkaRDD. How is the execution locality compared >> to the receiver-based approach? >> >> thanks, Gerard. >> > > > > -- > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra >
Re: Node afinity for Kafka-Direct Stream
Thanks Saisai, Mishra, Indeed, that hint will only work on a case where the Spark executor is co-located with the Kafka broker. I think the answer to my question as stated is that there's no warranty of where the task will execute as it will depend on the scheduler and cluster resources available (Mesos in our case). Therefore, any assumptions made about data locality using the consumer-based approach need to be reconsidered when migrating to the direct stream. ((In our case, we were using local caches to decide when a given secondary index for a record should be produced and written.)) -kr, Gerard. On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shaowrote: > This preferred locality is a hint to spark to schedule Kafka tasks on the > preferred nodes, if Kafka and Spark are two separate cluster, obviously > this locality hint takes no effect, and spark will schedule tasks following > node-local -> rack-local -> any pattern, like any other spark tasks. > > On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra > wrote: > >> Hi Gerard, >> I am also trying to understand the same issue. Whatever code I have seen >> it looks like once Kafka RDD is constructed the execution of that RDD is >> upto the task scheduler and it can schedule the partitions based on the >> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it >> maps to the Kafka partitions host . So if Kafka and Spark are co hosted >> probably this will work. If not, I am not sure how to get data locality for >> a partition. >> Others, >> correct me if there is a way. >> >> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas >> wrote: >> >>> In the receiver-based kafka streaming model, given that each receiver >>> starts as a long-running task, one can rely in a certain degree of data >>> locality based on the kafka partitioning: Data published on a given >>> topic/partition will land on the same spark streaming receiving node until >>> the receiver dies and needs to be restarted somewhere else. >>> >>> As I understand, the direct-kafka streaming model just computes offsets >>> and relays the work to a KafkaRDD. How is the execution locality compared >>> to the receiver-based approach? >>> >>> thanks, Gerard. >>> >> >> >> >> -- >> >> Regards, >> Rishitesh Mishra, >> SnappyData . (http://www.snappydata.io/) >> >> https://in.linkedin.com/in/rishiteshmishra >> > >
Re: Kafka Direct Stream
I went through the story and as I understood it is for saving data to multiple keyspaces at once. How will it work for saving data to multiple tables in same keyspace. I think tableName: String should also be tableName: T=>String.. Let me know if I understood incorrectly.. On Sat, Oct 3, 2015 at 9:55 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi, > > collect(partialFunction) is equivalent to filter(x=> > partialFunction.isDefinedAt(x)).map(partialFunction) so it's functionally > equivalent to your expression. I favor collect for its more compact form > but that's a personal preference. Use what you feel reads best. > > Regarding performance, there will be some overhead of submitting many a > task for every filtered RDD that gets materialized to Cassandra. That's the > reason I proposed the ticket linked above. Have a look whether that would > improve your particular usecase and vote for it if so :-) > > -kr, Gerard. > > On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Thanks Gerardthe code snippet you shared worked.. but can you please >> explain/point me the usage of *collect* here. How it is >> different(performance/readability) from *filter.* >> >>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))* >> >> >> I am doing something like this.Please tell if I can improve the *Processing >> time* of this particular code: >> >> kafkaStringStream.foreachRDD{rdd => >> val topics = rdd.map(_._1).distinct().collect() >> if (topics.length > 0) { >> val rdd_value = rdd.take(10).mkString("\n.\n") >> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all >> feeds\n$rdd_value")) >> >> topics.foreach { topic => >> //rdd.filter(x=> x._1 == topic).map(_._2) >> val filteredRdd = rdd.collect { case (t, data) if t == topic => data } >> CassandraHelper.saveDataToCassandra(topic, filteredRdd) >> } >> updateOffsetsinZk(rdd) >> } >> >> } >> >> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> Something like this? >>> >>> I'm making the assumption that your topic name equals your keyspace for >>> this filtering example. >>> >>> dstream.foreachRDD{rdd => >>> val topics = rdd.map(_._1).distinct.collect >>> topics.foreach{topic => >>> val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. >>> filteredRdd.saveToCassandra(topic, "table") // do not confuse this >>> collect with rdd.collect() that brings data to the driver >>> } >>> } >>> >>> >>> I'm wondering: would something like this ( >>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your >>> purposes? >>> >>> -kr, Gerard. >>> >>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> >>> wrote: >>> >>>> Hi Adrian, >>>> >>>> Can you please give an example of how to achieve this: >>>> >>>>> *I would also look at filtering by topic and saving as different >>>>> Dstreams in your code* >>>> >>>> I have managed to get DStream[(String, String)] which is ( >>>> *topic,my_data)* tuple. Lets call it kafkaStringStream. >>>> Now if I do kafkaStringStream.groupByKey() then I would get a >>>> DStream[(String,Iterable[String])]. >>>> But I want a DStream instead of Iterable in order to apply >>>> saveToCassandra for storing it. >>>> >>>> Please help in how to transform iterable to DStream or any other >>>> workaround for achieving same. >>>> >>>> >>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> >>>> wrote: >>>> >>>>> On top of that you could make the topic part of the key (e.g. keyBy in >>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>>>> operators for the processing. >>>>> >>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>>>> topics) and the processing is *really* different, I would also look >>>>> at filtering by topic and saving as different Dstreams in your code. >>>>> >>>>> Either way you need to start with Cody’s tip in order to extract the >>>>> topic name. &g
Re: Kafka Direct Stream
Thanks Gerardthe code snippet you shared worked.. but can you please explain/point me the usage of *collect* here. How it is different(performance/readability) from *filter.* > *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))* I am doing something like this.Please tell if I can improve the *Processing time* of this particular code: kafkaStringStream.foreachRDD{rdd => val topics = rdd.map(_._1).distinct().collect() if (topics.length > 0) { val rdd_value = rdd.take(10).mkString("\n.\n") Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all feeds\n$rdd_value")) topics.foreach { topic => //rdd.filter(x=> x._1 == topic).map(_._2) val filteredRdd = rdd.collect { case (t, data) if t == topic => data } CassandraHelper.saveDataToCassandra(topic, filteredRdd) } updateOffsetsinZk(rdd) } } On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Something like this? > > I'm making the assumption that your topic name equals your keyspace for > this filtering example. > > dstream.foreachRDD{rdd => > val topics = rdd.map(_._1).distinct.collect > topics.foreach{topic => > val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. > filteredRdd.saveToCassandra(topic, "table") // do not confuse this > collect with rdd.collect() that brings data to the driver > } > } > > > I'm wondering: would something like this ( > https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your > purposes? > > -kr, Gerard. > > On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Hi Adrian, >> >> Can you please give an example of how to achieve this: >> >>> *I would also look at filtering by topic and saving as different >>> Dstreams in your code* >> >> I have managed to get DStream[(String, String)] which is ( >> *topic,my_data)* tuple. Lets call it kafkaStringStream. >> Now if I do kafkaStringStream.groupByKey() then I would get a >> DStream[(String,Iterable[String])]. >> But I want a DStream instead of Iterable in order to apply >> saveToCassandra for storing it. >> >> Please help in how to transform iterable to DStream or any other >> workaround for achieving same. >> >> >> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> On top of that you could make the topic part of the key (e.g. keyBy in >>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>> operators for the processing. >>> >>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>> topics) and the processing is *really* different, I would also look at >>> filtering by topic and saving as different Dstreams in your code. >>> >>> Either way you need to start with Cody’s tip in order to extract the >>> topic name. >>> >>> -adrian >>> >>> From: Cody Koeninger >>> Date: Thursday, October 1, 2015 at 5:06 PM >>> To: Udit Mehta >>> Cc: user >>> Subject: Re: Kafka Direct Stream >>> >>> You can get the topic for a given partition from the offset range. You >>> can either filter using that; or just have a single rdd and match on topic >>> when doing mapPartitions or foreachPartition (which I think is a better >>> idea) >>> >>> >>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >>> >>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: >>> >>>> Hi, >>>> >>>> I am using spark direct stream to consume from multiple topics in >>>> Kafka. I am able to consume fine but I am stuck at how to separate the data >>>> for each topic since I need to process data differently depending on the >>>> topic. >>>> I basically want to split the RDD consisting on N topics into N RDD's >>>> each having 1 topic. >>>> >>>> Any help would be appreciated. >>>> >>>> Thanks in advance, >>>> Udit >>>> >>> >>> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*
Re: Kafka Direct Stream
Hi, collect(partialFunction) is equivalent to filter(x=> partialFunction.isDefinedAt(x)).map(partialFunction) so it's functionally equivalent to your expression. I favor collect for its more compact form but that's a personal preference. Use what you feel reads best. Regarding performance, there will be some overhead of submitting many a task for every filtered RDD that gets materialized to Cassandra. That's the reason I proposed the ticket linked above. Have a look whether that would improve your particular usecase and vote for it if so :-) -kr, Gerard. On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com> wrote: > Thanks Gerardthe code snippet you shared worked.. but can you please > explain/point me the usage of *collect* here. How it is > different(performance/readability) from *filter.* > >> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))* > > > I am doing something like this.Please tell if I can improve the *Processing > time* of this particular code: > > kafkaStringStream.foreachRDD{rdd => > val topics = rdd.map(_._1).distinct().collect() > if (topics.length > 0) { > val rdd_value = rdd.take(10).mkString("\n.\n") > Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all > feeds\n$rdd_value")) > > topics.foreach { topic => > //rdd.filter(x=> x._1 == topic).map(_._2) > val filteredRdd = rdd.collect { case (t, data) if t == topic => data } > CassandraHelper.saveDataToCassandra(topic, filteredRdd) > } > updateOffsetsinZk(rdd) > } > > } > > On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> > wrote: > >> Something like this? >> >> I'm making the assumption that your topic name equals your keyspace for >> this filtering example. >> >> dstream.foreachRDD{rdd => >> val topics = rdd.map(_._1).distinct.collect >> topics.foreach{topic => >> val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. >> filteredRdd.saveToCassandra(topic, "table") // do not confuse this >> collect with rdd.collect() that brings data to the driver >> } >> } >> >> >> I'm wondering: would something like this ( >> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your >> purposes? >> >> -kr, Gerard. >> >> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> >> wrote: >> >>> Hi Adrian, >>> >>> Can you please give an example of how to achieve this: >>> >>>> *I would also look at filtering by topic and saving as different >>>> Dstreams in your code* >>> >>> I have managed to get DStream[(String, String)] which is ( >>> *topic,my_data)* tuple. Lets call it kafkaStringStream. >>> Now if I do kafkaStringStream.groupByKey() then I would get a >>> DStream[(String,Iterable[String])]. >>> But I want a DStream instead of Iterable in order to apply >>> saveToCassandra for storing it. >>> >>> Please help in how to transform iterable to DStream or any other >>> workaround for achieving same. >>> >>> >>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: >>> >>>> On top of that you could make the topic part of the key (e.g. keyBy in >>>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>>> operators for the processing. >>>> >>>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>>> topics) and the processing is *really* different, I would also look at >>>> filtering by topic and saving as different Dstreams in your code. >>>> >>>> Either way you need to start with Cody’s tip in order to extract the >>>> topic name. >>>> >>>> -adrian >>>> >>>> From: Cody Koeninger >>>> Date: Thursday, October 1, 2015 at 5:06 PM >>>> To: Udit Mehta >>>> Cc: user >>>> Subject: Re: Kafka Direct Stream >>>> >>>> You can get the topic for a given partition from the offset range. You >>>> can either filter using that; or just have a single rdd and match on topic >>>> when doing mapPartitions or foreachPartition (which I think is a better >>>> idea) >>>> >>>> >>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >>>> >>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am using spark direct stream to consume from multiple topics in >>>>> Kafka. I am able to consume fine but I am stuck at how to separate the >>>>> data >>>>> for each topic since I need to process data differently depending on the >>>>> topic. >>>>> I basically want to split the RDD consisting on N topics into N RDD's >>>>> each having 1 topic. >>>>> >>>>> Any help would be appreciated. >>>>> >>>>> Thanks in advance, >>>>> Udit >>>>> >>>> >>>> >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> >> > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >
Re: Kafka Direct Stream
Something like this? I'm making the assumption that your topic name equals your keyspace for this filtering example. dstream.foreachRDD{rdd => val topics = rdd.map(_._1).distinct.collect topics.foreach{topic => val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. filteredRdd.saveToCassandra(topic, "table") // do not confuse this collect with rdd.collect() that brings data to the driver } } I'm wondering: would something like this ( https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your purposes? -kr, Gerard. On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> wrote: > Hi Adrian, > > Can you please give an example of how to achieve this: > >> *I would also look at filtering by topic and saving as different Dstreams >> in your code* > > I have managed to get DStream[(String, String)] which is (*topic,my_data)* > tuple. Lets call it kafkaStringStream. > Now if I do kafkaStringStream.groupByKey() then I would get a > DStream[(String,Iterable[String])]. > But I want a DStream instead of Iterable in order to apply saveToCassandra > for storing it. > > Please help in how to transform iterable to DStream or any other > workaround for achieving same. > > > On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: > >> On top of that you could make the topic part of the key (e.g. keyBy in >> .transform or manually emitting a tuple) and use one of the .xxxByKey >> operators for the processing. >> >> If you have a stable, domain specific list of topics (e.g. 3-5 named >> topics) and the processing is *really* different, I would also look at >> filtering by topic and saving as different Dstreams in your code. >> >> Either way you need to start with Cody’s tip in order to extract the >> topic name. >> >> -adrian >> >> From: Cody Koeninger >> Date: Thursday, October 1, 2015 at 5:06 PM >> To: Udit Mehta >> Cc: user >> Subject: Re: Kafka Direct Stream >> >> You can get the topic for a given partition from the offset range. You >> can either filter using that; or just have a single rdd and match on topic >> when doing mapPartitions or foreachPartition (which I think is a better >> idea) >> >> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >> >> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: >> >>> Hi, >>> >>> I am using spark direct stream to consume from multiple topics in Kafka. >>> I am able to consume fine but I am stuck at how to separate the data for >>> each topic since I need to process data differently depending on the topic. >>> I basically want to split the RDD consisting on N topics into N RDD's >>> each having 1 topic. >>> >>> Any help would be appreciated. >>> >>> Thanks in advance, >>> Udit >>> >> >> > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >
Re: Kafka Direct Stream
Hi Adrian, Can you please give an example of how to achieve this: > *I would also look at filtering by topic and saving as different Dstreams > in your code* I have managed to get DStream[(String, String)] which is (*topic,my_data)* tuple. Lets call it kafkaStringStream. Now if I do kafkaStringStream.groupByKey() then I would get a DStream[(String,Iterable[String])]. But I want a DStream instead of Iterable in order to apply saveToCassandra for storing it. Please help in how to transform iterable to DStream or any other workaround for achieving same. On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: > On top of that you could make the topic part of the key (e.g. keyBy in > .transform or manually emitting a tuple) and use one of the .xxxByKey > operators for the processing. > > If you have a stable, domain specific list of topics (e.g. 3-5 named > topics) and the processing is *really* different, I would also look at > filtering by topic and saving as different Dstreams in your code. > > Either way you need to start with Cody’s tip in order to extract the topic > name. > > -adrian > > From: Cody Koeninger > Date: Thursday, October 1, 2015 at 5:06 PM > To: Udit Mehta > Cc: user > Subject: Re: Kafka Direct Stream > > You can get the topic for a given partition from the offset range. You > can either filter using that; or just have a single rdd and match on topic > when doing mapPartitions or foreachPartition (which I think is a better > idea) > > > http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers > > On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: > >> Hi, >> >> I am using spark direct stream to consume from multiple topics in Kafka. >> I am able to consume fine but I am stuck at how to separate the data for >> each topic since I need to process data differently depending on the topic. >> I basically want to split the RDD consisting on N topics into N RDD's >> each having 1 topic. >> >> Any help would be appreciated. >> >> Thanks in advance, >> Udit >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*
Re: Kafka Direct Stream
Hi Nicolae, Won't creating N KafkaDirectStreams be an overhead for my streaming job compared to Single DirectStream? On Fri, Oct 2, 2015 at 1:13 AM, Nicolae Marasoiu < nicolae.maras...@adswizz.com> wrote: > Hi, > > > If you just need processing per topic, why not generate N different kafka > direct streams ? when creating a kafka direct stream you have list of > topics - just give one. > > > Then the reusable part of your computations should be extractable as > transformations/functions and reused between the streams. > > > Nicu > > > > -- > *From:* Adrian Tanase <atan...@adobe.com> > *Sent:* Thursday, October 1, 2015 5:47 PM > *To:* Cody Koeninger; Udit Mehta > *Cc:* user > *Subject:* Re: Kafka Direct Stream > > On top of that you could make the topic part of the key (e.g. keyBy in > .transform or manually emitting a tuple) and use one of the .xxxByKey > operators for the processing. > > If you have a stable, domain specific list of topics (e.g. 3-5 named > topics) and the processing is *really* different, I would also look at > filtering by topic and saving as different Dstreams in your code. > > Either way you need to start with Cody’s tip in order to extract the topic > name. > > -adrian > > From: Cody Koeninger > Date: Thursday, October 1, 2015 at 5:06 PM > To: Udit Mehta > Cc: user > Subject: Re: Kafka Direct Stream > > You can get the topic for a given partition from the offset range. You > can either filter using that; or just have a single rdd and match on topic > when doing mapPartitions or foreachPartition (which I think is a better > idea) > > > http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers > > <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> > Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ... > Spark Streaming + Kafka Integration Guide. Apache Kafka is > publish-subscribe messaging rethought as a distributed, partitioned, > replicated commit log service. > Read more... > <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> > > > > On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: > >> Hi, >> >> I am using spark direct stream to consume from multiple topics in Kafka. >> I am able to consume fine but I am stuck at how to separate the data for >> each topic since I need to process data differently depending on the topic. >> I basically want to split the RDD consisting on N topics into N RDD's >> each having 1 topic. >> >> Any help would be appreciated. >> >> Thanks in advance, >> Udit >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*
Re: Kafka Direct Stream
On top of that you could make the topic part of the key (e.g. keyBy in .transform or manually emitting a tuple) and use one of the .xxxByKey operators for the processing. If you have a stable, domain specific list of topics (e.g. 3-5 named topics) and the processing is really different, I would also look at filtering by topic and saving as different Dstreams in your code. Either way you need to start with Cody’s tip in order to extract the topic name. -adrian From: Cody Koeninger Date: Thursday, October 1, 2015 at 5:06 PM To: Udit Mehta Cc: user Subject: Re: Kafka Direct Stream You can get the topic for a given partition from the offset range. You can either filter using that; or just have a single rdd and match on topic when doing mapPartitions or foreachPartition (which I think is a better idea) http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com<mailto:ume...@groupon.com>> wrote: Hi, I am using spark direct stream to consume from multiple topics in Kafka. I am able to consume fine but I am stuck at how to separate the data for each topic since I need to process data differently depending on the topic. I basically want to split the RDD consisting on N topics into N RDD's each having 1 topic. Any help would be appreciated. Thanks in advance, Udit
Re: Kafka Direct Stream
You can get the topic for a given partition from the offset range. You can either filter using that; or just have a single rdd and match on topic when doing mapPartitions or foreachPartition (which I think is a better idea) http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehtawrote: > Hi, > > I am using spark direct stream to consume from multiple topics in Kafka. I > am able to consume fine but I am stuck at how to separate the data for each > topic since I need to process data differently depending on the topic. > I basically want to split the RDD consisting on N topics into N RDD's each > having 1 topic. > > Any help would be appreciated. > > Thanks in advance, > Udit >
Re: Kafka Direct Stream
Hi, If you just need processing per topic, why not generate N different kafka direct streams ? when creating a kafka direct stream you have list of topics - just give one. Then the reusable part of your computations should be extractable as transformations/functions and reused between the streams. Nicu From: Adrian Tanase <atan...@adobe.com> Sent: Thursday, October 1, 2015 5:47 PM To: Cody Koeninger; Udit Mehta Cc: user Subject: Re: Kafka Direct Stream On top of that you could make the topic part of the key (e.g. keyBy in .transform or manually emitting a tuple) and use one of the .xxxByKey operators for the processing. If you have a stable, domain specific list of topics (e.g. 3-5 named topics) and the processing is really different, I would also look at filtering by topic and saving as different Dstreams in your code. Either way you need to start with Cody's tip in order to extract the topic name. -adrian From: Cody Koeninger Date: Thursday, October 1, 2015 at 5:06 PM To: Udit Mehta Cc: user Subject: Re: Kafka Direct Stream You can get the topic for a given partition from the offset range. You can either filter using that; or just have a single rdd and match on topic when doing mapPartitions or foreachPartition (which I think is a better idea) http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers [http://spark.apache.org/docs/latest/img/spark-logo-hd.png]<http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ... Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Read more...<http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com<mailto:ume...@groupon.com>> wrote: Hi, I am using spark direct stream to consume from multiple topics in Kafka. I am able to consume fine but I am stuck at how to separate the data for each topic since I need to process data differently depending on the topic. I basically want to split the RDD consisting on N topics into N RDD's each having 1 topic. Any help would be appreciated. Thanks in advance, Udit
[streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException
Hi I have simple spark-streaming job(8 executors 1 core - on 8 node cluster) - read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra. The problem is that when I increase number of incoming messages in topic the job is starting to fail with kafka.common.OffsetOutOfRangeException. Job fails starting from 100 events per second. Thanks in advance - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kafka Direct Stream
Hi, I am using spark direct stream to consume from multiple topics in Kafka. I am able to consume fine but I am stuck at how to separate the data for each topic since I need to process data differently depending on the topic. I basically want to split the RDD consisting on N topics into N RDD's each having 1 topic. Any help would be appreciated. Thanks in advance, Udit
Re: [streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException
Offset out of range means the message in question is no longer available on Kafka. What's your kafka log retention set to, and how does that compare to your processing time? On Wed, Sep 30, 2015 at 4:26 AM, Alexey Ponkinwrote: > Hi > > I have simple spark-streaming job(8 executors 1 core - on 8 node cluster) > - read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra. > The problem is that when I increase number of incoming messages in topic > the job is starting to fail with kafka.common.OffsetOutOfRangeException. > Job fails starting from 100 events per second. > > Thanks in advance > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Kafka Direct Stream join without data shuffle
I have a stream got from Kafka with direct approach, say, inputStream, I need to 1. Create another DStream derivedStream with map or mapPartitions (with some data enrichment with reference table) on inputStream 2. Join derivedStream with inputStream In my use case, I don't need to shuffle data. Each partition in derivedStream only needs to be joined with the corresponding partition in the original parent inputStream it is generated from. My question is 1. Is there a Partitioner defined in KafkaRDD at all? 2. How would I preserve the partitioning scheme and avoid data shuffle? -- Chen Song
Re: Kafka Direct Stream join without data shuffle
No, there isn't a partitioner for KafkaRDD (KafkaRDD may not even be a pair rdd, for instance). It sounds to me like if it's a self-join, you should be able to do it in a single mapPartition operation. On Wed, Sep 2, 2015 at 3:06 PM, Chen Songwrote: > I have a stream got from Kafka with direct approach, say, inputStream, I > need to > > 1. Create another DStream derivedStream with map or mapPartitions (with > some data enrichment with reference table) on inputStream > 2. Join derivedStream with inputStream > > In my use case, I don't need to shuffle data. Each partition in > derivedStream only needs to be joined with the corresponding partition in > the original parent inputStream it is generated from. > > My question is > > 1. Is there a Partitioner defined in KafkaRDD at all? > 2. How would I preserve the partitioning scheme and avoid data shuffle? > > -- > Chen Song > >
Re: Spark off heap memory leak on Yarn with Kafka direct stream
Does the issue only happen when you have no traffic on the topic? Have you profiled to see what's using heap space? On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen apoorva.sar...@gmail.com wrote: Hi, I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space in my driver but it eventually runs out of it The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered ever in any such runs Infact I am not generating any traffic on the kafka queues still this happens. Here is the code I am using object SimpleSparkStreaming extends App { val conf = new SparkConf() val ssc = new StreamingContext(conf,Seconds(conf.getLong(spark.batch.window.size,1L))); ssc.checkpoint(checkpoint) val topics = Set(conf.get(spark.kafka.topic.name)); val kafkaParams = Map[String, String](metadata.broker.list - conf.get(spark.kafka.broker.list)) val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics) kafkaStream.foreachRDD(rdd = { rdd.foreach(x = { println(x._2) }) }) kafkaStream.print() ssc.start() ssc.awaitTermination() } I am running this on CentOS 7. The command used for spark submit is following ./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \ --conf spark.yarn.executor.memoryOverhead=256 \ --conf spark.yarn.driver.memoryOverhead=384 \ --conf spark.kafka.topic.name=test \ --conf spark.kafka.broker.list=172.31.45.218:9092 \ --conf spark.batch.window.size=1 \ --conf spark.app.name=Simple Spark Kafka application \ --master yarn-cluster \ --num-executors 1 \ --driver-memory 192m \ --executor-memory 128m \ --executor-cores 1 \ /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar Any help is greatly appreciated Regards, Apoorva
Spark off heap memory leak on Yarn with Kafka direct stream
Hi, I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space in my driver but it eventually runs out of it The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered ever in any such runs Infact I am not generating any traffic on the kafka queues still this happens. Here is the code I am using object SimpleSparkStreaming extends App { val conf = new SparkConf() val ssc = new StreamingContext(conf,Seconds(conf.getLong(spark.batch.window.size,1L))); ssc.checkpoint(checkpoint) val topics = Set(conf.get(spark.kafka.topic.name)); val kafkaParams = Map[String, String](metadata.broker.list - conf.get(spark.kafka.broker.list)) val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics) kafkaStream.foreachRDD(rdd = { rdd.foreach(x = { println(x._2) }) }) kafkaStream.print() ssc.start() ssc.awaitTermination() } I am running this on CentOS 7. The command used for spark submit is following ./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \ --conf spark.yarn.executor.memoryOverhead=256 \ --conf spark.yarn.driver.memoryOverhead=384 \ --conf spark.kafka.topic.name=test \ --conf spark.kafka.broker.list=172.31.45.218:9092 \ --conf spark.batch.window.size=1 \ --conf spark.app.name=Simple Spark Kafka application \ --master yarn-cluster \ --num-executors 1 \ --driver-memory 192m \ --executor-memory 128m \ --executor-cores 1 \ /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar Any help is greatly appreciated Regards, Apoorva
Re: Spark off heap memory leak on Yarn with Kafka direct stream
It happens irrespective of whether there is traffic or no traffic on the kafka topic. Also, there is no clue i could see in the heap space. The heap looks healthy and stable. Its something off heap which is constantly growing. I also checked the JNI reference count from the dumps which appear stable (its constantly getting GCed) and tried to limit the size of meatspace and direct memory using following --conf spark.driver.extraJavaOptions=-XX:MaxMetaspaceSize=128M -XX:MaxDirectMemorySize=128M \ but with no success. Thanks for offering help Regards, Apoorva On 14-Jul-2015, at 12:43 am, Cody Koeninger c...@koeninger.org wrote: Does the issue only happen when you have no traffic on the topic? Have you profiled to see what's using heap space? On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen apoorva.sar...@gmail.com mailto:apoorva.sar...@gmail.com wrote: Hi, I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space in my driver but it eventually runs out of it The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered ever in any such runs Infact I am not generating any traffic on the kafka queues still this happens. Here is the code I am using object SimpleSparkStreaming extends App { val conf = new SparkConf() val ssc = new StreamingContext(conf,Seconds(conf.getLong(spark.batch.window.size,1L))); ssc.checkpoint(checkpoint) val topics = Set(conf.get(spark.kafka.topic.name http://spark.kafka.topic.name/)); val kafkaParams = Map[String, String](metadata.broker.list - conf.get(spark.kafka.broker.list)) val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics) kafkaStream.foreachRDD(rdd = { rdd.foreach(x = { println(x._2) }) }) kafkaStream.print() ssc.start() ssc.awaitTermination() } I am running this on CentOS 7. The command used for spark submit is following ./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \ --conf spark.yarn.executor.memoryOverhead=256 \ --conf spark.yarn.driver.memoryOverhead=384 \ --conf spark.kafka.topic.name http://spark.kafka.topic.name/=test \ --conf spark.kafka.broker.list=172.31.45.218:9092 http://172.31.45.218:9092/ \ --conf spark.batch.window.size=1 \ --conf spark.app.name http://spark.app.name/=Simple Spark Kafka application \ --master yarn-cluster \ --num-executors 1 \ --driver-memory 192m \ --executor-memory 128m \ --executor-cores 1 \ /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar Any help is greatly appreciated Regards, Apoorva
Kafka Direct Stream - Custom Serialization and Deserilization
Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStream eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82org.apache.spark.streaming.api.java.JavaPairInputDStream K eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82K, V eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82V org eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg .apache eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache .spark eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark .streaming eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming .kafka eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka .KafkaUtils eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils .createDirectStream(JavaStreamingContext eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82org.apache.spark.streaming.api.java.JavaStreamingContext arg0, Class eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82java.lang.Class K eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD
Re: Kafka Direct Stream - Custom Serialization and Deserilization
my question is why there are similar two parameter String.Class and StringDecoder.class what is the difference each of them ? Ashish On Fri, Jun 26, 2015 at 8:53 AM, Akhil Das ak...@sigmoidanalytics.com wrote: JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); Here: jssc = JavaStreamingContext String.class = Key , Value classes StringDecoder = Key, Value decoder classes KafkaParams = Map in which you specify all the kafka details (like brokers, offset etc) topicSet = Set of topics from which you want to consume data. Here's a sample program https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java for you to start. Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, Set String arg6) ID Name Unit Rate Duration
Re: Kafka Direct Stream - Custom Serialization and Deserilization
There is one for the key of your Kafka message and one for its value. On 26 Jun 2015 4:21 pm, Ashish Soni asoni.le...@gmail.com wrote: my question is why there are similar two parameter String.Class and StringDecoder.class what is the difference each of them ? Ashish On Fri, Jun 26, 2015 at 8:53 AM, Akhil Das ak...@sigmoidanalytics.com wrote: JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); Here: jssc = JavaStreamingContext String.class = Key , Value classes StringDecoder = Key, Value decoder classes KafkaParams = Map in which you specify all the kafka details (like brokers, offset etc) topicSet = Set of topics from which you want to consume data. Here's a sample program https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java for you to start. Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, Set String arg6) ID Name Unit Rate Duration
Re: Kafka Direct Stream - Custom Serialization and Deserilization
JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); Here: jssc = JavaStreamingContext String.class = Key , Value classes StringDecoder = Key, Value decoder classes KafkaParams = Map in which you specify all the kafka details (like brokers, offset etc) topicSet = Set of topics from which you want to consume data. Here's a sample program https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java for you to start. Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, SetString arg6) ID Name Unit Rate Duration