Re: Low Level Kafka Consumer for Spark
Hi Dib, For our usecase I want my spark job1 to read from hdfs/cache and write to kafka queues. Similarly spark job2 should read from kafka queues and write to kafka queues. Is writing to kafka queues from spark job supported in your code ? Thanks Deb On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
My code handles the Kafka Consumer part. But writing to Kafka may not be a big challenge which you can easily do in your driver code. dibyendu On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das debasish.da...@gmail.com wrote: Hi Dib, For our usecase I want my spark job1 to read from hdfs/cache and write to kafka queues. Similarly spark job2 should read from kafka queues and write to kafka queues. Is writing to kafka queues from spark job supported in your code ? Thanks Deb On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi, Yes, as Jerry mentioned, the Spark -3129 ( https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature which solves the Driver failure problem. The way 3129 is designed , it solved the driver failure problem agnostic of the source of the stream ( like Kafka or Flume etc) But with just 3129 you can not achieve complete solution for data loss. You need a reliable receiver which should also solves the data loss issue on receiver failure. The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer) for which this email thread was started has solved that problem with Kafka Low Level API. And Spark-4062 as Jerry mentioned also recently solved the same problem using Kafka High Level API. On the Kafka High Level Consumer API approach , I would like to mention that Kafka 0.8 has some issue as mentioned in this wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design) where consumer re-balance sometime fails and that is one of the key reason Kafka is re-writing consumer API in Kafka 0.9. I know there are few folks already have faced this re-balancing issues while using Kafka High Level API , and If you ask my opinion, we at Pearson are still using the Low Level Consumer as this seems to be more robust and performant and we have been using this for few months without any issue ..and also I may be little biased :) Regards, Dibyendu On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
My main complain about the WAL mechanism in the new reliable kafka receiver is that you have to enable checkpointing and for some reason, even if spark.cleaner.ttl is set to a reasonable value, only the metadata is cleaned periodically. In my tests, using a folder in my filesystem as the checkpoint folder, the receivedMetaData folder remains almost constant in size but the receivedData folder is always increasing; the spark.cleaner.ttl was configured to 300 seconds. 2014-12-03 10:13 GMT+00:00 Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com: Hi, Yes, as Jerry mentioned, the Spark -3129 ( https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature which solves the Driver failure problem. The way 3129 is designed , it solved the driver failure problem agnostic of the source of the stream ( like Kafka or Flume etc) But with just 3129 you can not achieve complete solution for data loss. You need a reliable receiver which should also solves the data loss issue on receiver failure. The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer) for which this email thread was started has solved that problem with Kafka Low Level API. And Spark-4062 as Jerry mentioned also recently solved the same problem using Kafka High Level API. On the Kafka High Level Consumer API approach , I would like to mention that Kafka 0.8 has some issue as mentioned in this wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design) where consumer re-balance sometime fails and that is one of the key reason Kafka is re-writing consumer API in Kafka 0.9. I know there are few folks already have faced this re-balancing issues while using Kafka High Level API , and If you ask my opinion, we at Pearson are still using the Low Level Consumer as this seems to be more robust and performant and we have been using this for few months without any issue ..and also I may be little biased :) Regards, Dibyendu On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu,What are your thoughts on keeping this solution (or not), considering that Spark Streaming v1.2 will have built-in recoverability of the received data?https://issues.apache.org/jira/browse/SPARK-1647I'm concerned about the complexity of this solution with regards the added complexity and performance overhead by the writing of big amounts of data into HDFS on a small batch interval.https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?pli=1# http://apache-spark-user-list.1001560.n3.nabble.com/file/n20181/spark_streaming_v.png I think the whole solution is well designed and thought but I'm afraid if it does really fit all needs with checkpoint based technologies like Flume or Kafka, by hiding away the management of the offset from the user code. If instead of saving received data into HDFS, the ReceiverHandler would be saving some metadata (such as offset in the case of Kafka) specified by the custom receiver passed into the StreamingContext, then upon driver restart, that metadata could be used by the custom receiver to recover the point from which it should start receiving data once more.Anyone's comments will be greatly appreciated.Tnks,Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20181.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Low Level Kafka Consumer for Spark
Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, I am a little confused about the need for rate limiting input from kafka. If the stream coming in from kafka has higher message/second rate than what a Spark job can process then it should simply build a backlog in Spark if the RDDs are cached on disk using persist(). Right? Thanks, Tim On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Tim, I have not tried persist the RDD. Here are some discussion on Rate Limiting Spark Streaming is there in this thread. http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html There is a Pull Request https://github.com/apache/spark/pull/945/files to fix this Rate Limiting issue at BlockGenerator level. But while testing with heavy load, this fix did not solve my problem. So I had to have Rate Limiting built into Kafka Consumer. I will make it configurable soon. If this is not done, I can see Block are getting dropped which leads to Job failure. I have raised this in another thread .. https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239. But have not got any answer yet if this is a bug ( Block getting dropped and Job failed). Dib On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote: Hi Dibyendu, I am a little confused about the need for rate limiting input from kafka. If the stream coming in from kafka has higher message/second rate than what a Spark job can process then it should simply build a backlog in Spark if the RDDs are cached on disk using persist(). Right? Thanks, Tim On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi , The latest changes with Kafka message re-play by manipulating ZK offset seems to be working fine for us. This gives us some relief till actual issue is fixed in Spark 1.2 . I have some question on how Spark process the Received data. The logic I used is basically to pull messages form individual partitions using dedicated Receivers, and doing a Union of these Stream . After that I process this union stream. Today I wanted to test this consumer with our Internal Kafka cluster which has around 50 million records, with this huge backlog I found Spark only running the Receiver task and not running the Processing task (or rather doing it very slow) . Is this a issue with the Consumer or it is a issue from Spark side ? Ideally when Receivers durably write data to Store , the processing should start in parallel . Why does the processing task need to wait till the Receiver consumes all 50 million messages. ...Or may be I am doing something wrong ? I can share the driver log if you want. in Driver I can see only storage.BlockManagerInfo: Added input... type messages, but hardly I see scheduler.TaskSetManager: Starting task... messages.. I see data getting written to target system in very very slow pace. Regards, Dibyendu On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Tathagata, I have managed to implement the logic into the Kafka-Spark consumer to recover from Driver failure. This is just a interim fix till actual fix is done from Spark side. The logic is something like this. 1. When the Individual Receivers starts for every Topic partition, it writes the Kafka messages along with certain meta data in Block Store. This meta data contains the details of message offset, partition id, topic name and consumer id. You can see this logic in PartitionManager.java next() method. 2. In the Driver code ( Consumer.java) , I am creating the union of all there individual D-Streams, and processing the data using forEachRDD call. In the driver code, I am receiving the RDD which contains the Kafka messages along with meta data details. In the driver code, periodically I am committing the processed offset of the Kafka message into ZK. 3. When driver stops, and restart again, the Receiver starts again, and this time in PartitionManager.java, I am checking what is the actual committed offset for the partition, and what is the actual processed offset of the same partition. This logic is in the PartitionManager constructor. If this is a Receiver restart, and processed offset of less than Committed offset, I am started fetching again from Processed offset. This may lead to duplicate records, but our system can handle duplicates. I have tested with multiple driver kill/stops and I found no data loss in Kafka consumer. In the Driver code, I have not done any checkpointing yet, will test that tomorrow. One interesting thing I found, if I do repartition of original stream , I can still see the issue of data loss in this logic. What I believe, during re- partitioning Spark might be changing the order of RDDs the way it generated from Kafka stream. So during re-partition case, even when I am committing processed offset, but as this is not in order I still see issue. Not sure if this understanding is correct, but not able to find any other explanation. But if I do not use repartition this solution works fine. I can make this as configurable, so that when actual fix is available , this feature in consumer can be turned off as this is an overhead for the consumer . Let me know what you think.. Regards, Dibyendu On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3.
Re: Low Level Kafka Consumer for Spark
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't the right way. You have to /val partedStream = repartition(...)/. Would be nice to have it fixed in the docs. On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3. Repartitioning: I am trying to understand the repartition issue. One common mistake I have seen is that developers repartition a stream but not use the repartitioned stream. WRONG: inputDstream.repartition(100) inputDstream.map(...).count().print() RIGHT: val repartitionedDStream = inputDStream.repartitoin(100) repartitionedDStream.map(...).count().print() Not sure if this helps solve the problem that you all the facing. I am going to add this to the stremaing programming guide to make sure this common mistake is avoided. TD On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context:
Re: Low Level Kafka Consumer for Spark
Hi Tathagata, I have managed to implement the logic into the Kafka-Spark consumer to recover from Driver failure. This is just a interim fix till actual fix is done from Spark side. The logic is something like this. 1. When the Individual Receivers starts for every Topic partition, it writes the Kafka messages along with certain meta data in Block Store. This meta data contains the details of message offset, partition id, topic name and consumer id. You can see this logic in PartitionManager.java next() method. 2. In the Driver code ( Consumer.java) , I am creating the union of all there individual D-Streams, and processing the data using forEachRDD call. In the driver code, I am receiving the RDD which contains the Kafka messages along with meta data details. In the driver code, periodically I am committing the processed offset of the Kafka message into ZK. 3. When driver stops, and restart again, the Receiver starts again, and this time in PartitionManager.java, I am checking what is the actual committed offset for the partition, and what is the actual processed offset of the same partition. This logic is in the PartitionManager constructor. If this is a Receiver restart, and processed offset of less than Committed offset, I am started fetching again from Processed offset. This may lead to duplicate records, but our system can handle duplicates. I have tested with multiple driver kill/stops and I found no data loss in Kafka consumer. In the Driver code, I have not done any checkpointing yet, will test that tomorrow. One interesting thing I found, if I do repartition of original stream , I can still see the issue of data loss in this logic. What I believe, during re- partitioning Spark might be changing the order of RDDs the way it generated from Kafka stream. So during re-partition case, even when I am committing processed offset, but as this is not in order I still see issue. Not sure if this understanding is correct, but not able to find any other explanation. But if I do not use repartition this solution works fine. I can make this as configurable, so that when actual fix is available , this feature in consumer can be turned off as this is an overhead for the consumer . Let me know what you think.. Regards, Dibyendu On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3. Repartitioning: I am trying to understand the repartition issue. One common mistake I have seen is that developers repartition a stream but not use the repartitioned stream. WRONG: inputDstream.repartition(100) inputDstream.map(...).count().print() RIGHT: val repartitionedDStream = inputDStream.repartitoin(100) repartitionedDStream.map(...).count().print() Not sure if this helps solve the problem that you all the facing. I am going to add this to the stremaing programming guide to make sure this common mistake is avoided. TD On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream?
Re: Low Level Kafka Consumer for Spark
Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I'm no expert. But as I understand, yes you create multiple streams to consume multiple partitions in parallel. If they're all in the same Kafka consumer group, you'll get exactly one copy of the message so yes if you have 10 consumers and 3 Kafka partitions I believe only 3 will be getting messages. The parallelism of Spark's processing of the RDDs of those messages is different. There could be 4 partitions in your RDDs doing the work. This is the kind of thing you potentially influence with repartition. That is I believe you can get more tasks processing the messages even if you are only able to consume messages from the queue with 3-way parallelism, since the queue has 3 partitions. On Aug 30, 2014 12:56 AM, Tim Smith secs...@gmail.com wrote: Ok, so I did this: val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 1)) } val kInMsg = ssc.union(kInStreams) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) This has improved parallelism. Earlier I would only get a Stream 0. Now I have Streams [0-9]. Of course, since the kafka topic has only three partitions, only three of those streams are active but I am seeing more blocks being pulled across the three streams total that what one was doing earlier. Also, four nodes are actively processing tasks (vs only two earlier) now which actually has me confused. If Streams are active only on 3 nodes then how/why did a 4th node get work? If a 4th got work why aren't more nodes getting work? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I have this same question. Isn't there somewhere that the Kafka range metadata can be saved? From my naive perspective, it seems like it should be very similar to HDFS lineage. The original HDFS blocks are kept somewhere (in the driver?) so that if an RDD partition is lost, it can be recomputed. In this case, all we need is the Kafka topic, partition, and offset range. Can someone enlighten us on why two copies of the RDD are needed (or some other mechanism like a WAL) for fault tolerance when using Kafka but not when reading from say HDFS? On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote: 'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area as i hear these questions asked a lot in my talks and such. and i think a clear, crisp story on scaling and fault-tolerance will help Spark Streaming's adoption. hope that helps! -chris On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we
Re: Low Level Kafka Consumer for Spark
I'd be interested to understand this mechanism as well. But this is the error recovery part of the equation. Consuming from Kafka has two aspects - parallelism and error recovery and I am not sure how either works. For error recovery, I would like to understand how: - A failed receiver gets re-spawned. In 1.0.0, despite settings failed tasks threshold to 64, my job aborts after 4 receiver task failures. - Data loss recovery due to a failed receiver task/executor. For parallelism, I would expect a single createStream() to intelligently map a receiver thread somewhere, one for each kafka partition, but in different JVMs. Also, repartition() does not seem to work as advertised. A repartition(512) should get nodes other than the receiver nodes to get some RDDs to process. No? On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover roger.hoo...@gmail.com wrote: I have this same question. Isn't there somewhere that the Kafka range metadata can be saved? From my naive perspective, it seems like it should be very similar to HDFS lineage. The original HDFS blocks are kept somewhere (in the driver?) so that if an RDD partition is lost, it can be recomputed. In this case, all we need is the Kafka topic, partition, and offset range. Can someone enlighten us on why two copies of the RDD are needed (or some other mechanism like a WAL) for fault tolerance when using Kafka but not when reading from say HDFS? On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote: 'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area
Re: Low Level Kafka Consumer for Spark
Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area as i hear these questions asked a lot in my talks and such. and i think a clear, crisp story on scaling and fault-tolerance will help Spark Streaming's adoption. hope that helps! -chris On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we should be able to access it once more and not having to pool it back again from Kafka or any other stream that is being affected by this issue. If for example there is a big amount of batches to be recomputed I would rather have them done distributed than overloading the batch interval with huge amount of Kafka messages. I do not have yet enough know how on where is the issue and about the internal Spark code so I can't really how much difficult will be the implementation. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
Re: Low Level Kafka Consumer for Spark
Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512) Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_= KafkaUtils.createStream()} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote: Are you using multiple Dstreams? repartitioning does not affect how many receivers you have. It's on 2 nodes for each receiver. You need multiple partitions in the queue, each consumed by a DStream, if you mean to parallelize consuming the queue. On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote: Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Ok, so I did this: val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 1)) } val kInMsg = ssc.union(kInStreams) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) This has improved parallelism. Earlier I would only get a Stream 0. Now I have Streams [0-9]. Of course, since the kafka topic has only three partitions, only three of those streams are active but I am seeing more blocks being pulled across the three streams total that what one was doing earlier. Also, four nodes are actively processing tasks (vs only two earlier) now which actually has me confused. If Streams are active only on 3 nodes then how/why did a 4th node get work? If a 4th got work why aren't more nodes getting work? On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote: I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512) Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_= KafkaUtils.createStream()} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote: Are you using multiple Dstreams? repartitioning does not affect how many receivers you have. It's on 2 nodes for each receiver. You need multiple partitions in the queue, each consumed by a DStream, if you mean to parallelize consuming the queue. On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote: Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
@bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area as i hear these questions asked a lot in my talks and such. and i think a clear, crisp story on scaling and fault-tolerance will help Spark Streaming's adoption. hope that helps! -chris On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we should be able to access it once more and not having to pool it back again from Kafka or any other stream that is being affected by this issue. If for example there is a big amount of batches to be recomputed I would rather have them done distributed than overloading the batch interval with huge amount of Kafka messages. I do not have yet enough know how on where is the issue and about the internal Spark code so I can't really how much difficult will be the implementation. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, That would be great. One of the biggest drawback of Kafka utils as well as your implementation is I am unable to scale out processing. I am relatively new to Spark and Spark Streaming - from what I read and what I observe with my deployment is that having the RDD created on one receiver is processed by at most 2 nodes in my cluster (most likely because default replication is 2 and spark schedules processing close to where the data is). I tried rdd.replicate() to no avail. Would Chris and your proposal to have union of DStreams for all these Receivers still allow scaling out subsequent processing? Thanks, Bharat On Tue, Aug 26, 2014 at 10:59 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Chris and Bharat for your inputs. I agree, running multiple receivers/dstreams is desirable for scalability and fault tolerant. and this is easily doable. In present KafkaReceiver I am creating as many threads for each kafka topic partitions, but I can definitely create multiple KafkaReceivers for every partition. As Chris mentioned , in this case I need to then have union of DStreams for all these Receivers. I will try this out and let you know. Dib On Wed, Aug 27, 2014 at 9:10 AM, Chris Fregly ch...@fregly.com wrote: great work, Dibyendu. looks like this would be a popular contribution. expanding on bharat's question a bit: what happens if you submit multiple receivers to the cluster by creating and unioning multiple DStreams as in the kinesis example here: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123 for more context, the kinesis implementation above uses the Kinesis Client Library (KCL) to automatically assign - and load balance - stream shards among all KCL threads from all receivers (potentially coming and going as nodes die) on all executors/nodes using DynamoDB as the association data store. ZooKeeper would be used for your Kafka consumers, of course. and ZooKeeper watches to handle the ephemeral nodes. and I see you're using Curator, which makes things easier. as bharat suggested, running multiple receivers/dstreams may be desirable from a scalability and fault tolerance standpoint. is this type of load balancing possible among your different Kafka consumers running in different ephemeral JVMs? and isn't it fun proposing a popular piece of code? the question floodgates have opened! haha. :) -chris On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we should be able to access it once more and not having to pool it back again from Kafka or any other stream that is being affected by this issue. If for example there is a big amount of batches to be recomputed I would rather have them done distributed than overloading the batch interval with huge amount of Kafka messages. I do not have yet enough know how on where is the issue and about the internal Spark code so I can't really how much difficult will be the implementation. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi, As I understand, your problem is similar to this JIRA. https://issues.apache.org/jira/browse/SPARK-1647 The issue in this case, Kafka can not replay the message as offsets are already committed. Also I think existing KafkaUtils ( The Default High Level Kafka Consumer) also have this issue. Similar discussion is there in this thread also... http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html As I am thinking, it is possible to tackle this in the consumer code I have written. If we can store the topic partition_id and consumed offset in ZK after every checkpoint , then after Spark recover from the fail over, the present PartitionManager code can start reading from last checkpointed offset ( instead last committed offset as it is doing now) ..In that case it can replay the data since last checkpoint. I will think over it .. Regards, Dibyendu On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi Dibyendu, My colleague has taken a look at the spark kafka consumer github you have provided and started experimenting. We found that somehow when Spark has a failure after a data checkpoint, the expected re-computations correspondent to the metadata checkpoints are not recovered so we loose Kafka messages and RDD's computations in Spark. The impression is that this code is replacing quite a bit of Spark Kafka Streaming code where maybe (not sure) metadata checkpoints are done every batch interval. Was it on purpose to solely depend on the Kafka commit to recover data and recomputations between data checkpoints? If so, how to make this work? tnks Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
great work, Dibyendu. looks like this would be a popular contribution. expanding on bharat's question a bit: what happens if you submit multiple receivers to the cluster by creating and unioning multiple DStreams as in the kinesis example here: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123 for more context, the kinesis implementation above uses the Kinesis Client Library (KCL) to automatically assign - and load balance - stream shards among all KCL threads from all receivers (potentially coming and going as nodes die) on all executors/nodes using DynamoDB as the association data store. ZooKeeper would be used for your Kafka consumers, of course. and ZooKeeper watches to handle the ephemeral nodes. and I see you're using Curator, which makes things easier. as bharat suggested, running multiple receivers/dstreams may be desirable from a scalability and fault tolerance standpoint. is this type of load balancing possible among your different Kafka consumers running in different ephemeral JVMs? and isn't it fun proposing a popular piece of code? the question floodgates have opened! haha. :) -chris On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, My colleague has taken a look at the spark kafka consumer github you have provided and started experimenting. We found that somehow when Spark has a failure after a data checkpoint, the expected re-computations correspondent to the metadata checkpoints are not recovered so we loose Kafka messages and RDD's computations in Spark. The impression is that this code is replacing quite a bit of Spark Kafka Streaming code where maybe (not sure) metadata checkpoints are done every batch interval. Was it on purpose to solely depend on the Kafka commit to recover data and recomputations between data checkpoints? If so, how to make this work? tnks Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Thanks Jonathan, Yes, till non-ZK based offset management is available in Kafka, I need to maintain the offset in ZK. And yes, both cases explicit commit is necessary. I modified the Low Level Kafka Spark Consumer little bit to have Receiver spawns threads for every partition of the topic and perform the 'store' operation in multiple threads. It would be good if the receiver.store methods are made thread safe..which is not now presently . Waiting for TD's comment on this Kafka Spark Low Level consumer. Regards, Dibyendu On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote: Hi Yan, That is a good suggestion. I believe non-Zookeeper offset management will be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for September. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management That should make this fairly easy to implement, but it will still require explicit offset commits to avoid data loss which is different than the current KafkaUtils implementation. Jonathan On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote: Another suggestion that may help is that, you can consider use Kafka to store the latest offset instead of Zookeeper. There are at least two benefits: 1) lower the workload of ZK 2) support replay from certain offset. This is how Samza http://samza.incubator.apache.org/ deals with the Kafka offset, the doc is here http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html . Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com wrote: I'll let TD chime on on this one, but I'm guessing this would be a welcome addition. It's great to see community effort on adding new streams/receivers, adding a Java API for receivers was something we did specifically to allow this :) - Patrick On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu
Re: Low Level Kafka Consumer for Spark
Hi Yan, That is a good suggestion. I believe non-Zookeeper offset management will be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for September. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management That should make this fairly easy to implement, but it will still require explicit offset commits to avoid data loss which is different than the current KafkaUtils implementation. Jonathan On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote: Another suggestion that may help is that, you can consider use Kafka to store the latest offset instead of Zookeeper. There are at least two benefits: 1) lower the workload of ZK 2) support replay from certain offset. This is how Samza http://samza.incubator.apache.org/ deals with the Kafka offset, the doc is here http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html . Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com wrote: I'll let TD chime on on this one, but I'm guessing this would be a welcome addition. It's great to see community effort on adding new streams/receivers, adding a Java API for receivers was something we did specifically to allow this :) - Patrick On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu