[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15849781#comment-15849781 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Released latest version 1.0.10 of Receiver based Kafka Consumer for Spark Streaming @ Spark-Packages : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is now - Tuned for better performance - Support for Consumer Lag Check - WAL less recovery - Better tuned PID Controller having Auto Rate Adjustment with incoming traffic - Support for Custom Message Interceptors - Enhanced recovery scenarios from failures Please refer to https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more details > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: DStreams >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436628#comment-15436628 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Released latest version 1.0.8 of Receiver based Kafka Consumer for Spark Streaming. Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All Spark Versions Available at Spark-Packages : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer - No dependency on WAL and Checkpoint for recovery on Driver failure - ZK Based offset management for both consumed and processed offset - In-built PID Controller for Rate Limiting and Backpressure management - Custom Message Interceptor Please refer to https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more details > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15111206#comment-15111206 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Thanks Dan for your comments . Same thoughts many has told to me as well , and if you see large number of people has voted for this consumer to be included in Spark Core. I wish all who voted for this and using the same should also comment about their opinion . Unfortunately Spark Comitters think otherwise. Spark still document faulty Receiver based model in their website which has issues , and there are many who need alternatives of Direct Stream but reluctant to use spark-packages library and go ahead and use what ever is mentioned in Spark website. This seems to me misguiding people and forcing them to use a buggy consumer despite a better alternatives exists. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15069474#comment-15069474 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Hi Sean, I think you misunderstood my comment earlier. My intention to contribute it to Spark is not because that I get support from other , but rather bigger community will have access to a better consumer. The Spark out of the box Receiver based consumer which spark is presently having has serious issues around it. And not accepting a better receiver is somewhat denying the community a better choice. And I do not know why you should do that if there is a better alternatives exists and many have successfully using the same. Lot many will start using this if this become part of Spark project. What I meant earlier that community is little reluctant to use something important like Kafka Consumer which is part of spark-packages and rather use the faulty Receiver based consumer from Spark Project without knowing the consequences...that's the issue I see it here. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15069506#comment-15069506 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Hi Sean, Unfortunately existing implementation is not Fixable just because its uses the Kafka High Level Consumer. Otherwise it would have been fixed by now. Only way the existing Receiver based mode is fixable if we use Kafka Low Level consumer and that is my consumer does. And I am very sure this is better implementation and much stable and performant than what Spark has. I do not go by someone's comment either . Things needs to be tested and measured by its performance and stability and folks who using this has found that this perform much better than what Spark Project have presently. I think TD knows this very well that because of the High Level consumer issue Kafka has rewritten their consumer API in 0.9. The problem is we just can not move the existing receiver based consumer to 0.9 as Kafka 0.9 is recently released and not stable yet. At the same time there are lot of people using Kafka 0.8.x who are denied a stable Receiver consumer for Spark for quite a long. The call is yours , I will obviously post it to forums and spark groups as and when I release new features. But I am not convinced the reason for not accepting it in Spark Project. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really
[jira] [Commented] (SPARK-10694) Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon)
[ https://issues.apache.org/jira/browse/SPARK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030576#comment-15030576 ] Dibyendu Bhattacharya commented on SPARK-10694: --- Hi [~andrewor14] . Can you please let me know if this PR makes sense . > Prevent Data Loss in Spark Streaming when used with OFF_HEAP > ExternalBlockStore (Tachyon) > - > > Key: SPARK-10694 > URL: https://issues.apache.org/jira/browse/SPARK-10694 > Project: Spark > Issue Type: Bug > Components: Block Manager, Streaming >Affects Versions: 1.5.0 >Reporter: Dibyendu Bhattacharya > > If Streaming application stores the blocks OFF_HEAP, it may not need any WAL > like feature to recover from Driver failure. As long as the writing of blocks > to Tachyon from Streaming receiver is durable, it should be recoverable from > Tachyon directly on Driver failure. > This can solve the issue of expensive WAL write and duplicating the blocks > both in MEMORY and also WAL and also guarantee end to end No-Data-Loss > channel using OFF_HEAP store. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14977874#comment-14977874 ] Dibyendu Bhattacharya commented on SPARK-11045: --- hi [~tdas] , let me know what is your comment on this Jira. Do you think this should continue to be part of Spark Packages or good to include into Spark Project so that community will get reliable and better alternative of Direct API ( if at all someone does not want Direct API) ? > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952300#comment-14952300 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Well, the ordering of messages are guaranteed per partition of Kafka topic. Within partitions there is no ordering guarantee. So when you process the RDD in DirectStream having 10 partition , there is also no global ordering of messages are maintained when RDD is processed . If someone really need global ordering , then one should use 1 partition for kafka topic... Regarding contributing anything to Spark, I believe in open source community , it should be driven by how demanding a feature is in the community , but not by someone's opinion . What I am trying to emphasize here there are lot of folks showing interest of using this and many already started using it. And if you just judge this purely by performance and feature wise it does much better than default Receiver based approach and Direct Stream approach. Unfortunately there is no publicly available bench-marking done and I think I should plan to do one to make this case stronger. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952391#comment-14952391 ] Dibyendu Bhattacharya commented on SPARK-11045: --- I think I already mentioned this JIRA is NOT a replacement for DirectStream. This JIRA is a replacement for existing Receiver based model . There are use cases like "Exactly Once" can only be solved by Direct Stream . But there are use case where user do not need to care about Exact Once semantics or Global Ordering or even the ordering within partition. There are case , these ordering can be taken care by your downstream system ( say hbase) . There are even cases where there is a need to process more than 1 Batch concurrently (Spark Streaming concurrent jobs > 1) . This receiver will be best suited for those cases instead of using DirectStream . I think we need to understand this differentiation. This receiver is proposed as a replacement for existing receiver model which has issue as that uses High level API which is faulty and much slower than Simple Consumer API , and can use in use cases where Exactly Once and ordering semantics is not a strict requirement. By the way , does DirectStream solve the issue of strict ordering for a given Partition across batches ? What happen in case of failures.. I think TD should chime in here . > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. >
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952602#comment-14952602 ] Dibyendu Bhattacharya commented on SPARK-11045: --- I agree with you about the complexity of Kafka Spark consumer . This consumer is also taken the Kafka Connectivity part from Storm Kafka Spout which has been stabilized over time. And I do agree the code is complex. But that does not explain that why we deny a better consumer to community just because the code is complex. By the way , code is not that complex also that it will have maintainability issue . Storm Kafka spout has been running for a while now without any issue. Kafka's new Consumer API may bring more features and may perform better than SimpleConsumer API, but that will take some time I guess till the API is released and get stabilized over time. You know Kafka releases are notoriously slow. Thus I do not see in near future community will ever get a better Kafka Connector for Receiver approach if this is the stand from Spark side. There are other additional benefits of using Low Level API this consumer has utilized. Say controlling the rate limit by Block size is done here where as default rate limiting in Kafkautils is by number of messages. This is an issue where kafka has messages of different sizes and there is no deterministic way to know the actual block sizes and memory utilization if rate control done by number of messages. I think these things can be discussed if we convinced that Low level API is way to go. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952572#comment-14952572 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Hi Saisai, The Consumer I proposed here is using Kafka Low Level Simple Consumer API which gives more performance and does not have the issues of High Level API. Kafka is coming up with new Consumer API in 0.9 timeframe I believe. Only way to solve this problem in Kafka side is to use the new Consumer API once 0.9 comes. But those who use Kafka 0.8.x , will still have same issue. Better solution I guess is to use Kafka Low Level API which will not change across Kafka Versions and give all customer a better stable Kafka consumer. I guess that gives a consistency in terms of Kafka API for both Receiver based and Receiver less ( Direct Stream) approach which also uses same Kafka Low Level API. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951926#comment-14951926 ] Dibyendu Bhattacharya commented on SPARK-11045: --- I agree Sean. this space is already getting complicated. My intention was not at all to make it more confusing. What I see is , many customer is little reluctant to use this consumer from spark-packages thinking that it will get less support . Being at spark-packages, many does not even consider it using in their use cases rather use the whatever Receiver Based model which is documented with Spark. I think those who wants to fall back to Receiver based model , Spark out of the box Receivers does not give them a better choice and not many customer knows that a better choice exists in spark-packages. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) -
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951915#comment-14951915 ] Dibyendu Bhattacharya commented on SPARK-11045: --- Hi Cody, thanks for your comments. My opinion on parallelism is not around receiving parallelism from Kafka which is same for both receiver and direct stream mode. My thought was on the parallelism while processing the RDD. In DirectStream the partitions of the RDD is your number of topic partitions . So if you Kafka topic has 10 partition , your RDD will have 10 partition. And that's the max parallelism while processing the RDD (unless you do re-partition which some at a cost) Whereas , in Receiver based model, the number of partitions is dictated by Block Intervals and Batch Interval. If your block interval is 200 Ms, and Batch interval is 10 seconds , your RDD will have 50 partitions ! I believe that seems to give much better parallelism while processing the RDD. Regarding the state of spark-packages code, your comment is not at good taste. There are many companies who think otherwise and use the spark packages consumer in production. As I said earlier, DirectStream is definitely the choice if one need "Exactly Once", but there are many who does not want "Exactly Once" and does not want the overhead of using DirectStream. unfortunately , for them the other alternatives also good enough which uses Kafka high level API. I am here trying to give a better alternatives in terms of a much better Receiver based approach. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from
[jira] [Comment Edited] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951926#comment-14951926 ] Dibyendu Bhattacharya edited comment on SPARK-11045 at 10/10/15 4:58 PM: - I agree Sean. this space is already getting complicated. My intention was not at all to make it more confusing. What I see is , many customer is little reluctant to use this consumer from spark-packages thinking that it will get less support . Being at spark-packages, many does not even consider it using in their use cases rather use the whatever Receiver Based model which is documented with Spark. I think those who wants to fall back to Receiver based model , Spark out of the box Receivers does not give them a better choice and not many customer do not know that a better choice exists in spark-packages. was (Author: dibbhatt): I agree Sean. this space is already getting complicated. My intention was not at all to make it more confusing. What I see is , many customer is little reluctant to use this consumer from spark-packages thinking that it will get less support . Being at spark-packages, many does not even consider it using in their use cases rather use the whatever Receiver Based model which is documented with Spark. I think those who wants to fall back to Receiver based model , Spark out of the box Receivers does not give them a better choice and not many customer knows that a better choice exists in spark-packages. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of
[jira] [Comment Edited] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951915#comment-14951915 ] Dibyendu Bhattacharya edited comment on SPARK-11045 at 10/11/15 4:45 AM: - Hi Cody, thanks for your comments. My opinion on parallelism is not around receiving parallelism from Kafka which is same for both receiver and direct stream mode. My thought was on the parallelism while processing the RDD. In DirectStream the partitions of the RDD is your number of topic partitions . So if you Kafka topic has 10 partition , your RDD will have 10 partition. And that's the max parallelism while processing the RDD (unless you do re-partition which comes at a cost) Whereas , in Receiver based model, the number of partitions is dictated by Block Intervals and Batch Interval. If your block interval is 200 Ms, and Batch interval is 10 seconds , your RDD will have 50 partitions ! I believe that seems to give much better parallelism while processing the RDD. Regarding the state of spark-packages code, your comment is not at good taste. There are many companies who think otherwise and use the spark packages consumer in production. As I said earlier, DirectStream is definitely the choice if one need "Exactly Once", but there are many who does not want "Exactly Once" and does not want the overhead of using DirectStream. unfortunately , for them the other alternatives also not good enough which uses Kafka high level API. I am here trying to give a better alternatives in terms of a much better Receiver based approach. was (Author: dibbhatt): Hi Cody, thanks for your comments. My opinion on parallelism is not around receiving parallelism from Kafka which is same for both receiver and direct stream mode. My thought was on the parallelism while processing the RDD. In DirectStream the partitions of the RDD is your number of topic partitions . So if you Kafka topic has 10 partition , your RDD will have 10 partition. And that's the max parallelism while processing the RDD (unless you do re-partition which comes at a cost) Whereas , in Receiver based model, the number of partitions is dictated by Block Intervals and Batch Interval. If your block interval is 200 Ms, and Batch interval is 10 seconds , your RDD will have 50 partitions ! I believe that seems to give much better parallelism while processing the RDD. Regarding the state of spark-packages code, your comment is not at good taste. There are many companies who think otherwise and use the spark packages consumer in production. As I said earlier, DirectStream is definitely the choice if one need "Exactly Once", but there are many who does not want "Exactly Once" and does not want the overhead of using DirectStream. unfortunately , for them the other alternatives also good enough which uses Kafka high level API. I am here trying to give a better alternatives in terms of a much better Receiver based approach. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher
[jira] [Comment Edited] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951926#comment-14951926 ] Dibyendu Bhattacharya edited comment on SPARK-11045 at 10/11/15 4:47 AM: - I agree Sean. this space is already getting complicated. My intention was not at all to make it more confusing. What I see is , many customer is little reluctant to use this consumer from spark-packages thinking that it will get less support . Being at spark-packages, many does not even consider it using in their use cases rather use the whatever Receiver Based model which is documented with Spark. I think those who wants to fall back to Receiver based model , Spark out of the box Receivers does not give them a better choice and many customer do not know that a better choice exists in spark-packages. was (Author: dibbhatt): I agree Sean. this space is already getting complicated. My intention was not at all to make it more confusing. What I see is , many customer is little reluctant to use this consumer from spark-packages thinking that it will get less support . Being at spark-packages, many does not even consider it using in their use cases rather use the whatever Receiver Based model which is documented with Spark. I think those who wants to fall back to Receiver based model , Spark out of the box Receivers does not give them a better choice and not many customer do not know that a better choice exists in spark-packages. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of
[jira] [Updated] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-11045: -- Affects Version/s: 1.5.1 > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.1 >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-11045: -- Description: This JIRA is to track the progress of making the Receiver based Low Level Kafka Consumer from spark-packages (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be contributed back to Apache Spark Project. This Kafka consumer has been around for more than year and has matured over the time . I see there are many adoptions of this package . I receive positive feedbacks that this consumer gives better performance and fault tolerant capabilities. This is the primary intent of this JIRA to give community a better alternative if they want to use Receiver Base model. If this consumer make it to Spark Core, it will definitely see more adoption and support from community and help many who still prefer the Receiver Based model of Kafka Consumer. I understand the Direct Stream is the consumer which can give Exact Once semantics and uses Kafka Low Level API , which is good . But Direct Stream has concerns around recovering checkpoint on driver code change . Application developer need to manage their own offset which complex . Even if some one does manages their own offset , it limits the parallelism Spark Streaming can achieve. If someone wants more parallelism and want spark.streaming.concurrentJobs more than 1 , you can no longer rely on storing offset externally as you have no control which batch will run in which sequence. Furthermore , the Direct Stream has higher latency , as it fetch messages form Kafka during RDD action . Also number of RDD partitions are limited to topic partition . So unless your Kafka topic does not have enough partitions, you have limited parallelism while RDD processing. Due to above mentioned concerns , many people who does not want Exactly Once semantics , still prefer Receiver based model. Unfortunately, when customer fall back to KafkaUtil.CreateStream approach, which use Kafka High Level Consumer, there are other issues around the reliability of Kafka High Level API. Kafka High Level API is buggy and has serious issue around Consumer Re-balance. Hence I do not think this is correct to advice people to use KafkaUtil.CreateStream in production . The better option presently is there is to use the Consumer from spark-packages . It is is using Kafka Low Level Consumer API , store offset in Zookeeper, and can recover from any failure . Below are few highlights of this consumer .. 1. It has a inbuilt PID Controller for dynamic rate limiting. 2. In this consumer , The Rate Limiting is done by modifying the size blocks by controlling the size of messages pulled from Kafka. Whereas , in Spark the Rate Limiting is done by controlling number of messages. The issue with throttling by number of message is, if message size various, block size will also vary . Let say your Kafka has messages for different sizes from 10KB to 500 KB. Thus throttling by number of message can never give any deterministic size of your block hence there is no guarantee that Memory Back-Pressure can really take affect. 3. This consumer is using Kafka low level API which gives better performance than KafkaUtils.createStream based High Level API. 4. This consumer can give end to end no data loss channel if enabled with WAL. By accepting this low level kafka consumer from spark packages to apache spark project , we will give community a better options for Kafka connectivity both for Receiver less and Receiver based model. was: This JIRA is to track the progress of making the Receiver based Low Level Kafka Consumer from spark-packages (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be contributed back to Apache Spark Project. This Kafka consumer has been around for more than year and has matured over the time . I see there are many adoptions of this package . I receive positive feedbacks that this consumer gives better performance and fault tolerant capabilities. This is the primary intent of this JIRA to give community a better alternative if they want to use Receiver Base model. If this consumer make it to Spark Core, it will definitely see more adoption and support from community and help many who still prefer the Receiver Based model of Kafka Consumer. I understand the Direct Stream is the consumer which can give Exact Once semantics and uses Kafka Low Level API , which is good . But Direct Stream has issues around recovering checkpoint on driver code change . Application developer need to manage their own offset which complex . Even if some one does manages their own offset , it limits the parallelism Spark Streaming can achieve. If someone wants more parallelism and want spark.streaming.concurrentJobs more than 1 , you can no longer rely on storing offset
[jira] [Created] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
Dibyendu Bhattacharya created SPARK-11045: - Summary: Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project Key: SPARK-11045 URL: https://issues.apache.org/jira/browse/SPARK-11045 Project: Spark Issue Type: New Feature Components: Streaming Reporter: Dibyendu Bhattacharya This JIRA is to track the progress of making the Receiver based Low Level Kafka Consumer from spark-packages (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be contributed back to Apache Spark Project. This Kafka consumer has been around for more than year and has matured over the time . I see there are many adoptions of this package . I receive positive feedbacks that this consumer gives better performance and fault tolerant capabilities. This is the primary intent of this JIRA to give community a better alternative if they want to use Receiver Base model. If this consumer make it to Spark Core, it will definitely see more adoption and support from community and help many who still prefer the Receiver Based model of Kafka Consumer. I understand the Direct Stream is the consumer which can give Exact Once semantics and uses Kafka Low Level API , which is good . But Direct Stream has issues around recovering checkpoint on driver code change . Application developer need to manage their own offset which complex . Even if some one does manages their own offset , it limits the parallelism Spark Streaming can achieve. If someone wants more parallelism and want spark.streaming.concurrentJobs more than 1 , you can no longer rely on storing offset externally as you have no control which batch will run in which sequence. Furthermore , the Direct Stream has higher latency , as it fetch messages form Kafka during RDD action . Also number of RDD partitions are limited to topic partition . So unless your Kafka topic does not have enough partitions, you have limited parallelism while RDD processing. Due to above mentioned concerns , many people who does not want Exactly Once semantics , still prefer Receiver based model. Unfortunately, when customer fall back to KafkaUtil.CreateStream approach, which use Kafka High Level Consumer, there are other issues around the reliability of Kafka High Level API. Kafka High Level API is buggy and has serious issue around Consumer Re-balance. Hence I do not think this is correct to advice people to use KafkaUtil.CreateStream in production . The better option presently is there is to use the Consumer from spark-packages . It is is using Kafka Low Level Consumer API , store offset in Zookeeper, and can recover from any failure . Below are few highlights of this consumer .. 1. It has a inbuilt PID Controller for dynamic rate limiting. 2. In this consumer , The Rate Limiting is done by modifying the size blocks by controlling the size of messages pulled from Kafka. Whereas , in Spark the Rate Limiting is done by controlling number of messages. The issue with throttling by number of message is, if message size various, block size will also vary . Let say your Kafka has messages for different sizes from 10KB to 500 KB. Thus throttling by number of message can never give any deterministic size of your block hence there is no guarantee that Memory Back-Pressure can really take affect. 3. This consumer is using Kafka low level API which gives better performance than KafkaUtils.createStream based High Level API. 4. This consumer can give end to end no data loss channel if enabled with WAL. By accepting this low level kafka consumer from spark packages to apache spark project , we will give community a better options for Kafka connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-11045: -- Affects Version/s: (was: 1.5.1) > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10694) Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon)
[ https://issues.apache.org/jira/browse/SPARK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-10694: -- Component/s: Block Manager > Prevent Data Loss in Spark Streaming when used with OFF_HEAP > ExternalBlockStore (Tachyon) > - > > Key: SPARK-10694 > URL: https://issues.apache.org/jira/browse/SPARK-10694 > Project: Spark > Issue Type: Bug > Components: Block Manager, Streaming >Affects Versions: 1.5.0 >Reporter: Dibyendu Bhattacharya > > If Streaming application stores the blocks OFF_HEAP, it may not need any WAL > like feature to recover from Driver failure. As long as the writing of blocks > to Tachyon from Streaming receiver is durable, it should be recoverable from > Tachyon directly on Driver failure. > This can solve the issue of expensive WAL write and duplicating the blocks > both in MEMORY and also WAL and also guarantee end to end No-Data-Loss > channel using OFF_HEAP store. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10694) Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon)
Dibyendu Bhattacharya created SPARK-10694: - Summary: Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon) Key: SPARK-10694 URL: https://issues.apache.org/jira/browse/SPARK-10694 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.5.0 Reporter: Dibyendu Bhattacharya If Streaming application stores the blocks OFF_HEAP, it may not need any WAL like feature to recover from Driver failure. As long as the writing of blocks to Tachyon from Streaming receiver is durable, it should be recoverable from Tachyon directly on Driver failure. This can solve the issue of expensive WAL write and duplicating the blocks both in MEMORY and also WAL and also guarantee end to end No-Data-Loss channel using OFF_HEAP store. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627517#comment-14627517 ] Dibyendu Bhattacharya commented on SPARK-8591: -- This will be won't fix as suggested by [~tdas] in PR Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store locally , ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627513#comment-14627513 ] Dibyendu Bhattacharya commented on SPARK-8591: -- This is the summary of the issue as mentioned in the PR . Problem summary. If a block fails to unroll, the ReceiverTracker will never know about the block and will not include it in a future computation. In the mean time, however, the block may be replicated and take up space on other executors even though it will never be used. Implications for Spark core. For Spark core, however, it is reasonable to replicate a block even if it fails to unroll. Just because there is not enough memory to cache this block on this executor doesn't mean the same is true on a different executor. This is all best effort, but a future computation of the RDD will still benefit from having the cached block somewhere. (Note: the existing code doesn't actually do this for normal RDD caching yet because CacheManager has its own unrolling logic. We will address this separately in the future.) Alternative fix. The right fix for SPARK-8591 would be to have the ReceiverTracker just read its blocks from the BlockManagerMaster. This simplifies the two divergent block reporting code paths. Since the BlockManagerMaster is notified of replicated blocks, the replication here will also help mitigate data loss in the case of MEMORY_ONLY_*. TL;DR. This patch removes a small feature from block manager that, though not currently used, is desirable in the future for both Spark core and Spark streaming. However, the underlying issue is not caused by a bug in the block manager, but an incorrect assumption in the ReceiverTracker that doesn't take into account replication. The correct way forward would be to fix this in Spark streaming by refactoring the ReceiverTracker to depend on BlockManagerMaster. Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store locally , ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8591: - Component/s: (was: Block Manager) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store locally , ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8591: - Component/s: Streaming Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store locally , ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8591: - Description: Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store , Receiver ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. was: Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for level is MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store , Receiver ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store , Receiver ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen
[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8591: - Description: Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for level is MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store , Receiver ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. was:Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for level is MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store , Receiver ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
[ https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8591: - Description: Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store locally , ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. was: Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store , Receiver ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never happen that block unroll is successful but store to local memory has failed. Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master. Primary issue here is , for StorageLevel MEMORY_ONLY_2 , if BlockManager failed to unroll the block to memory and store to local is failed, BlockManager still replicate the same block to Remote peer. For Spark Streaming case , the Receivers get the PutResult from local BlockManager and if block failed to store locally , ReceivedBlockHandler throws the SparkException back to Receiver even though the block successfully replicated in Remote peer by BlockManager. This leads to wastage of memory at remote peer as that block can never be used in Streaming jobs. In case of Receiver failed to store the block, it can retry and for every failed retry ( to store to local) may leads to adding another unused block to remote and this may leads to many unwanted blocks in case of high volume receivers does multiple retry. The fix here proposed is to stop replicating the block if store to local has failed. This fix will prevent the scenario mentioned above and also will not impact the RDD Partition replications ( during Cache or Persists) as RDD CacheManager perform unrolling to memory first before attempting to store in local memory, and this can never
[jira] [Created] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel
Dibyendu Bhattacharya created SPARK-8591: Summary: Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel Key: SPARK-8591 URL: https://issues.apache.org/jira/browse/SPARK-8591 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Block which failed to unroll to memory and returned iterator and size 0, should not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and BlockStatus is not reported to Master -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593171#comment-14593171 ] Dibyendu Bhattacharya commented on SPARK-8474: -- https://kafka.apache.org/08/configuration.html it says fetch.message.max.bytes The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. This is not per messages , but size of message you fetch in every FetchRequest using FetchRequestBuilder [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer - Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593273#comment-14593273 ] Dibyendu Bhattacharya commented on SPARK-8474: -- I got this problem just once. Not able to reproduce it after that. Here is the executor stack trace from that occurrence . Not sure if this problem is related to some Kafka issue where Leader Offset comes wrong. 15/06/18 09:01:21 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/18 09:01:21 INFO SecurityManager: Changing view acls to: hadoop 15/06/18 09:01:21 INFO SecurityManager: Changing modify acls to: hadoop 15/06/18 09:01:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/18 09:01:22 INFO Slf4jLogger: Slf4jLogger started 15/06/18 09:01:22 INFO Remoting: Starting remoting 15/06/18 09:01:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@10.252.5.54:45553] 15/06/18 09:01:22 INFO Utils: Successfully started service 'driverPropsFetcher' on port 45553. 15/06/18 09:01:23 INFO SecurityManager: Changing view acls to: hadoop 15/06/18 09:01:23 INFO SecurityManager: Changing modify acls to: hadoop 15/06/18 09:01:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/06/18 09:01:23 INFO Slf4jLogger: Slf4jLogger started 15/06/18 09:01:23 INFO Remoting: Starting remoting 15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/06/18 09:01:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@10.252.5.54:56579] 15/06/18 09:01:23 INFO Utils: Successfully started service 'sparkExecutor' on port 56579. 15/06/18 09:01:23 INFO DiskBlockManager: Created local directory at /mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/blockmgr-2badd56a-7877-44d7-bb67-c309935ce1ba 15/06/18 09:01:23 INFO MemoryStore: MemoryStore started with capacity 883.8 MB 15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@10.252.5.113:52972/user/CoarseGrainedScheduler 15/06/18 09:01:23 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker 15/06/18 09:01:23 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker 15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 15/06/18 09:01:23 INFO Executor: Starting executor ID 1 on host 10.252.5.54 15/06/18 09:01:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34554. 15/06/18 09:01:23 INFO NettyBlockTransferService: Server created on 34554 15/06/18 09:01:23 INFO BlockManagerMaster: Trying to register BlockManager 15/06/18 09:01:24 INFO BlockManagerMaster: Registered BlockManager 15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 0 15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 1 15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 2 15/06/18 09:01:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/06/18 09:01:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 15/06/18 09:01:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 15/06/18 09:01:24 INFO Executor: Fetching http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar with timestamp 1434618080212 15/06/18 09:01:24 INFO Utils: Fetching http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar to /mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/fetchFileTemp4240791741464959275.tmp 15/06/18 09:01:25 INFO Utils: Copying /mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/19875585461434618080212_cache to /mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar 15/06/18 09:01:25 INFO Executor: Adding file:/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar to class loader 15/06/18 09:01:25 INFO TorrentBroadcast: Started reading broadcast variable 0 15/06/18 09:01:25 INFO MemoryStore: ensureFreeSpace(1399) called with curMem=0, maxMem=926731468 15/06/18 09:01:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1399.0 B, free 883.8 MB) 15/06/18 09:01:25 INFO TorrentBroadcast:
[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593285#comment-14593285 ] Dibyendu Bhattacharya commented on SPARK-8474: -- Yes , right .. May be this is a false alarm ...I did not see any issue with the logic. As I see KafkaRDD keep pulling messages of chunk size fetch.message.max.bytes (1 MB) in every fetchBatch and it will keep doing till it reach the untilOffset...So I may be wrong here. . I got the issue once and after that not able to reproduce it . Shared the executor trace from that run , and I can see some OffsetOutOfRange issue. Not sure how that come as I launch the receiver very first time and starting from earliest offset. Just to mention , for all successive run , I never see the output like this as the shared log.. 15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 offsets 0 - 2500 15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 1 offsets 0 - 2500 15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 2 offsets 0 - 2338 There must be some problem happened to get the offset ranges which seems to wrong I guess. This topic is very old topic and offset can not start from Zero (0).. [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer - Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8474: - Description: The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) was: The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. let say size of messages in Kafka are such that for default fetch.message.max.bytes limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition limit as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer - Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default
Dibyendu Bhattacharya created SPARK-8474: Summary: [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. let say size of messages in Kafka are such that for default fetch.message.max.bytes limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition limit as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8080) Custom Receiver.store with Iterator type do not give correct count at Spark UI
[ https://issues.apache.org/jira/browse/SPARK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dibyendu Bhattacharya updated SPARK-8080: - Attachment: screenshot.png Custom Receiver.store with Iterator type do not give correct count at Spark UI -- Key: SPARK-8080 URL: https://issues.apache.org/jira/browse/SPARK-8080 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.2.0 Reporter: Dibyendu Bhattacharya Fix For: 1.4.0 Attachments: screenshot.png In Custom receiver if I call store with Iterator type (store(dataIterator: Iterator[T]): Unit ) , Spark UI does not show the correct count of records in block which leads to wrong value for Input Rate, Scheduling Delay and Input SIze. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7525) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon
[ https://issues.apache.org/jira/browse/SPARK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540325#comment-14540325 ] Dibyendu Bhattacharya commented on SPARK-7525: -- I guess this is something to do with the lack of Tachyon Append Support. java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.init(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44) at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler$$anonfun$5.apply(ReceivedBlockHandler.scala:178) at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler$$anonfun$5.apply(ReceivedBlockHandler.scala:178) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon - Key: SPARK-7525 URL: https://issues.apache.org/jira/browse/SPARK-7525 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Environment: AWS , Spark Streaming 1.4 with Tachyon 0.6.4 Reporter: Dibyendu Bhattacharya I was testing Fault Tolerant aspect of Spark Streaming when Checkpoint directory is stored in Tachyon. Spark Streaming is able to recover from Driver failure , but when Receiver Failed, Spark Streaming not able read the WAL files written by failed Receiver. Below is exception when Receiver is failed . INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1) INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered receiver for stream 2 from 10.252.5.62:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at
[jira] [Created] (SPARK-7525) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon
Dibyendu Bhattacharya created SPARK-7525: Summary: Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon Key: SPARK-7525 URL: https://issues.apache.org/jira/browse/SPARK-7525 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Environment: AWS , Spark Streaming 1.4 with Tachyon 0.6.4 Reporter: Dibyendu Bhattacharya I was testing Fault Tolerant aspect of Spark Streaming when Checkpoint directory is stored in Tachyon. Spark Streaming is able to recover from Driver failure , but when Receiver Failed, Spark Streaming not able read the WAL files written by failed Receiver. Below is exception when Receiver is failed . INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1) INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered receiver for stream 2 from 10.252.5.62:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: 645603894, fileSize = 0 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 1] INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor updated: app-20150511104442-0048/2 is now LOST (worker lost) INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor app-20150511104442-0048/2 removed: worker lost ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asked to remove non-existent executor 2 INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in stage 103.0 (TID 423) on executor 10.252.5.62: org.apache.spark.SparkException
[jira] [Commented] (SPARK-7525) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon
[ https://issues.apache.org/jira/browse/SPARK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537914#comment-14537914 ] Dibyendu Bhattacharya commented on SPARK-7525: -- This issue not happening when chekpointing to HDFS . If Checkpoint directory is Tachyon , then this issue comes while Receiver fails . For Driver failure case, Spark Streaming can recover if checkpoint directory is in Tachyon .. Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon - Key: SPARK-7525 URL: https://issues.apache.org/jira/browse/SPARK-7525 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Environment: AWS , Spark Streaming 1.4 with Tachyon 0.6.4 Reporter: Dibyendu Bhattacharya I was testing Fault Tolerant aspect of Spark Streaming when Checkpoint directory is stored in Tachyon. Spark Streaming is able to recover from Driver failure , but when Receiver Failed, Spark Streaming not able read the WAL files written by failed Receiver. Below is exception when Receiver is failed . INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1) INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered receiver for stream 2 from 10.252.5.62:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: 645603894, fileSize = 0 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 1] INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) INFO :
[jira] [Commented] (SPARK-7477) TachyonBlockManager Store Block in TRY_CACHE mode which gives BlockNotFoundException when blocks are evicted from cache
[ https://issues.apache.org/jira/browse/SPARK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534805#comment-14534805 ] Dibyendu Bhattacharya commented on SPARK-7477: -- I tried Hierarchical Storage on Tachyon ( http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have worked and I did not see any any Spark Job failed due to BlockNotFoundException. below is my Hierarchical Storage settings.. -Dtachyon.worker.hierarchystore.level.max=2 -Dtachyon.worker.hierarchystore.level0.alias=MEM -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE -Dtachyon.worker.hierarchystore.level1.alias=HDD -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB -Dtachyon.worker.allocate.strategy=MAX_FREE -Dtachyon.worker.evict.strategy=LRU TachyonBlockManager Store Block in TRY_CACHE mode which gives BlockNotFoundException when blocks are evicted from cache --- Key: SPARK-7477 URL: https://issues.apache.org/jira/browse/SPARK-7477 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya With Spark Streaming on Tachyon as the OFF_HEAP block store I have used the low level Receiver based Kafka consumer (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) for Spark Streaming to pull from Kafka and write Blocks to Tachyon What I see TachyonBlockManager.scala put the blocks in WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted from Tachyon Cache and when Spark try to find the block it throws BlockNotFoundException . When I modified the WriteType to CACHE_THROUGH , BlockDropException is gone , but it impact the throughput .. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org