[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2017-02-02 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15849781#comment-15849781
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Released latest version 1.0.10 of Receiver based Kafka Consumer for Spark 
Streaming @ Spark-Packages : 

https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

This is now 

- Tuned for better performance
- Support for Consumer Lag Check
- WAL less recovery
- Better tuned PID Controller having Auto Rate Adjustment with incoming traffic
- Support for Custom Message Interceptors
- Enhanced recovery scenarios from failures

Please refer to 
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more 
details


> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: DStreams
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For 

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2016-08-25 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436628#comment-15436628
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Released latest version 1.0.8 of Receiver based Kafka Consumer for Spark 
Streaming.

Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All 
Spark Versions

Available at Spark-Packages : 
https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

- No dependency on WAL and Checkpoint for recovery on Driver failure  
- ZK Based offset management for both consumed and processed offset
- In-built PID Controller for Rate Limiting and Backpressure management 
- Custom Message Interceptor 

Please refer to 
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more 
details


> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2016-01-21 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15111206#comment-15111206
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Thanks Dan for your comments . Same thoughts many has told to me as well , and 
if you see large number of people has voted for this consumer to be included in 
Spark Core. I wish all who voted for this and using the same should also 
comment about their opinion . Unfortunately Spark Comitters think otherwise. 
Spark still document faulty Receiver based model in their website which has 
issues , and there are many who need alternatives of Direct Stream but 
reluctant to use spark-packages library and go ahead and use what ever is 
mentioned in Spark website. This seems to me misguiding people and forcing them 
to use a buggy consumer despite a better alternatives exists. 

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-12-23 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15069474#comment-15069474
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Hi Sean, 

I think you misunderstood my comment earlier. My intention to contribute it to 
Spark is not because that I get support from other , but rather bigger 
community will have access to a better consumer. The Spark out of the box 
Receiver based consumer which spark is presently having has serious issues 
around it. And not accepting a better receiver is somewhat denying the 
community a better choice. And I do not know why you should do that if there is 
a better alternatives exists and many have successfully using the same. Lot 
many will start using this if this become part of Spark project. 

What I meant earlier that community is little reluctant to use something 
important like Kafka Consumer which is part of spark-packages and rather use 
the faulty Receiver based consumer from Spark Project without knowing the 
consequences...that's the issue I see it here.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to 

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-12-23 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15069506#comment-15069506
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Hi Sean, 

Unfortunately existing implementation is not Fixable just because its uses the 
Kafka High Level Consumer. Otherwise it would have been fixed by now.  Only way 
the existing Receiver based mode is fixable if we use Kafka Low Level consumer 
and that is my consumer does. And I am very sure this is better implementation 
and much stable and performant than what Spark has. I do not go by someone's 
comment either . Things needs to be tested and measured by its performance and 
stability and folks who using this has found that this perform much better than 
what Spark Project have presently. 

I think TD knows this very well that because of the High Level consumer issue 
Kafka has rewritten their consumer API in 0.9. The problem is we just can not 
move the existing receiver based consumer to 0.9 as Kafka 0.9 is recently 
released and not stable yet. At the same time there are lot of people using 
Kafka 0.8.x who are denied a stable Receiver consumer for Spark for quite a 
long. The call is yours , I will obviously post it to forums and spark groups 
as and when I release new features. But I am not convinced the reason for not 
accepting it in Spark Project. 

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really 

[jira] [Commented] (SPARK-10694) Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon)

2015-11-28 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030576#comment-15030576
 ] 

Dibyendu Bhattacharya commented on SPARK-10694:
---

Hi [~andrewor14] . Can you please let me know if this PR makes sense .

> Prevent Data Loss in Spark Streaming when used with OFF_HEAP 
> ExternalBlockStore (Tachyon)
> -
>
> Key: SPARK-10694
> URL: https://issues.apache.org/jira/browse/SPARK-10694
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Streaming
>Affects Versions: 1.5.0
>Reporter: Dibyendu Bhattacharya
>
> If Streaming application stores the blocks OFF_HEAP, it may not need any WAL 
> like feature to recover from Driver failure. As long as the writing of blocks 
> to Tachyon from Streaming receiver is durable, it should be recoverable from 
> Tachyon directly on Driver failure. 
> This can solve the issue of expensive WAL write and duplicating the blocks 
> both in MEMORY and also WAL and also guarantee end to end No-Data-Loss 
> channel using OFF_HEAP store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-28 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14977874#comment-14977874
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

hi [~tdas] , let me know what is your comment on this Jira. Do you think this 
should continue to be part of Spark Packages or good to include into Spark 
Project so that community will get reliable and better alternative of Direct 
API ( if at all someone does not want Direct API) ? 

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952300#comment-14952300
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Well, the ordering of messages are guaranteed per partition of Kafka topic. 
Within partitions there is no ordering guarantee. So when you process the RDD 
in DirectStream having 10 partition , there is also no global ordering of 
messages are maintained when RDD is processed . If someone really need global 
ordering , then one should use 1 partition for kafka topic...

Regarding contributing anything to Spark, I believe in open source community , 
it should be driven by how demanding a feature is in the community , but not by 
someone's opinion . What I am trying to emphasize here there are lot of folks 
showing interest of using this and many already started using it.  And if you 
just judge this purely by performance and feature wise it does much better than 
default Receiver based approach and Direct Stream approach. Unfortunately there 
is no publicly available bench-marking done and I think I should plan to do one 
to make this case stronger.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with 

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952391#comment-14952391
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

I think I already mentioned this JIRA is NOT a replacement for DirectStream. 
This JIRA is a replacement for existing Receiver based model .

There are use cases like "Exactly Once" can only be solved by Direct Stream . 
But there are use case where user do not need to care about Exact Once 
semantics or Global Ordering or even the ordering within partition. There are 
case , these ordering can be taken care by your downstream system ( say hbase) 
. There are even cases where there is a need to process more than 1 Batch 
concurrently (Spark Streaming concurrent jobs > 1) . This receiver will be best 
suited for those cases instead of using DirectStream . I think we need to 
understand this differentiation. 

This receiver is proposed as a replacement for existing receiver model which 
has issue as that uses High level API which is faulty and much slower than 
Simple Consumer API , and can use in use cases where Exactly Once and ordering 
semantics is not a strict requirement. 

By the way , does DirectStream solve the issue of strict ordering for a given 
Partition across batches ? What happen in case of failures..

I think TD should chime in here . 

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952602#comment-14952602
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

I agree with you about the complexity of Kafka Spark consumer  . This consumer 
is also taken the Kafka Connectivity part from Storm Kafka Spout which has been 
stabilized over time. And I do agree the code is complex. But that does not 
explain that why we deny a better consumer to community just because the code 
is complex. By the way , code is not that complex also that it will have  
maintainability issue . Storm Kafka spout has been running for a while now 
without any issue.

Kafka's new Consumer API may bring more features and may perform better than 
SimpleConsumer API, but that will take some time I guess till the API is 
released and get stabilized over time. You know Kafka releases are notoriously 
slow. Thus I do not see in near future community will ever get a better Kafka 
Connector for Receiver approach if this is the stand from Spark side.

There are other additional benefits of using Low Level API this consumer has 
utilized. Say controlling the rate limit by Block size is done here where as 
default rate limiting in Kafkautils is by number of messages. This is an issue 
where kafka has messages of different sizes and there is no deterministic way 
to know the actual block sizes and memory utilization if rate control done by 
number of messages. I think these things can be discussed if we convinced that 
Low level API is way to go. 

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different 

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14952572#comment-14952572
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Hi Saisai, 

The Consumer I proposed here is using Kafka Low Level Simple Consumer API which 
gives more performance and does not have the issues of High Level API. Kafka is 
coming up with new Consumer API in 0.9 timeframe I believe. Only way to solve 
this problem in Kafka side is to use the new Consumer API once 0.9 comes. 

But those who use Kafka 0.8.x , will still have same issue. Better solution I 
guess is to use Kafka Low Level API which will not change across Kafka Versions 
and give all customer a better stable Kafka consumer. I guess that gives a 
consistency in terms of Kafka API for both Receiver based and Receiver less ( 
Direct Stream) approach which also uses same Kafka Low Level API.


> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by 

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951926#comment-14951926
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

I agree Sean. this space is already getting complicated. My intention was not 
at all to make it more confusing. 

What I see is , many customer is little reluctant to use this consumer from 
spark-packages thinking that it will get less support . Being at 
spark-packages, many does not even consider it using in their use cases rather 
use the whatever Receiver Based model which is documented with Spark. I think 
those who wants to fall back to Receiver based model , Spark out of the box 
Receivers does not give them a better choice and not many customer knows that a 
better choice exists in spark-packages.


> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951915#comment-14951915
 ] 

Dibyendu Bhattacharya commented on SPARK-11045:
---

Hi Cody, thanks for your comments. 

My opinion on parallelism is not around receiving parallelism from Kafka which 
is same for both receiver and direct stream mode. My thought was on the 
parallelism while processing the RDD. In DirectStream the partitions of the RDD 
is your number of topic partitions . So if you Kafka topic has 10 partition , 
your RDD will have 10 partition. And that's the max parallelism while 
processing the RDD (unless you do re-partition which some at a cost)  Whereas , 
in Receiver based model, the number of partitions is dictated by Block 
Intervals and Batch Interval. If your block interval is 200 Ms, and Batch 
interval is 10 seconds , your RDD will have 50 partitions !

I believe that seems to give much better parallelism while processing the RDD. 

Regarding the state of spark-packages code, your comment is not at good taste. 
There are many companies who think otherwise and use the spark packages 
consumer in production. 

As I said earlier, DirectStream is definitely the choice if one need "Exactly 
Once", but there are many who does not want "Exactly Once" and does not want 
the overhead of using DirectStream. unfortunately , for them the other 
alternatives also good enough which uses Kafka high level API. I am here trying 
to give a better alternatives in terms of a much better Receiver based approach.





> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 

[jira] [Comment Edited] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951926#comment-14951926
 ] 

Dibyendu Bhattacharya edited comment on SPARK-11045 at 10/10/15 4:58 PM:
-

I agree Sean. this space is already getting complicated. My intention was not 
at all to make it more confusing. 

What I see is , many customer is little reluctant to use this consumer from 
spark-packages thinking that it will get less support . Being at 
spark-packages, many does not even consider it using in their use cases rather 
use the whatever Receiver Based model which is documented with Spark. I think 
those who wants to fall back to Receiver based model , Spark out of the box 
Receivers does not give them a better choice and not many customer do not know 
that a better choice exists in spark-packages.



was (Author: dibbhatt):
I agree Sean. this space is already getting complicated. My intention was not 
at all to make it more confusing. 

What I see is , many customer is little reluctant to use this consumer from 
spark-packages thinking that it will get less support . Being at 
spark-packages, many does not even consider it using in their use cases rather 
use the whatever Receiver Based model which is documented with Spark. I think 
those who wants to fall back to Receiver based model , Spark out of the box 
Receivers does not give them a better choice and not many customer knows that a 
better choice exists in spark-packages.


> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of 

[jira] [Comment Edited] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951915#comment-14951915
 ] 

Dibyendu Bhattacharya edited comment on SPARK-11045 at 10/11/15 4:45 AM:
-

Hi Cody, thanks for your comments. 

My opinion on parallelism is not around receiving parallelism from Kafka which 
is same for both receiver and direct stream mode. My thought was on the 
parallelism while processing the RDD. In DirectStream the partitions of the RDD 
is your number of topic partitions . So if you Kafka topic has 10 partition , 
your RDD will have 10 partition. And that's the max parallelism while 
processing the RDD (unless you do re-partition which comes at a cost)  Whereas 
, in Receiver based model, the number of partitions is dictated by Block 
Intervals and Batch Interval. If your block interval is 200 Ms, and Batch 
interval is 10 seconds , your RDD will have 50 partitions !

I believe that seems to give much better parallelism while processing the RDD. 

Regarding the state of spark-packages code, your comment is not at good taste. 
There are many companies who think otherwise and use the spark packages 
consumer in production. 

As I said earlier, DirectStream is definitely the choice if one need "Exactly 
Once", but there are many who does not want "Exactly Once" and does not want 
the overhead of using DirectStream. unfortunately , for them the other 
alternatives also not good enough which uses Kafka high level API. I am here 
trying to give a better alternatives in terms of a much better Receiver based 
approach.






was (Author: dibbhatt):
Hi Cody, thanks for your comments. 

My opinion on parallelism is not around receiving parallelism from Kafka which 
is same for both receiver and direct stream mode. My thought was on the 
parallelism while processing the RDD. In DirectStream the partitions of the RDD 
is your number of topic partitions . So if you Kafka topic has 10 partition , 
your RDD will have 10 partition. And that's the max parallelism while 
processing the RDD (unless you do re-partition which comes at a cost)  Whereas 
, in Receiver based model, the number of partitions is dictated by Block 
Intervals and Batch Interval. If your block interval is 200 Ms, and Batch 
interval is 10 seconds , your RDD will have 50 partitions !

I believe that seems to give much better parallelism while processing the RDD. 

Regarding the state of spark-packages code, your comment is not at good taste. 
There are many companies who think otherwise and use the spark packages 
consumer in production. 

As I said earlier, DirectStream is definitely the choice if one need "Exactly 
Once", but there are many who does not want "Exactly Once" and does not want 
the overhead of using DirectStream. unfortunately , for them the other 
alternatives also good enough which uses Kafka high level API. I am here trying 
to give a better alternatives in terms of a much better Receiver based approach.





> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher 

[jira] [Comment Edited] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951926#comment-14951926
 ] 

Dibyendu Bhattacharya edited comment on SPARK-11045 at 10/11/15 4:47 AM:
-

I agree Sean. this space is already getting complicated. My intention was not 
at all to make it more confusing. 

What I see is , many customer is little reluctant to use this consumer from 
spark-packages thinking that it will get less support . Being at 
spark-packages, many does not even consider it using in their use cases rather 
use the whatever Receiver Based model which is documented with Spark. I think 
those who wants to fall back to Receiver based model , Spark out of the box 
Receivers does not give them a better choice and many customer do not know that 
a better choice exists in spark-packages.



was (Author: dibbhatt):
I agree Sean. this space is already getting complicated. My intention was not 
at all to make it more confusing. 

What I see is , many customer is little reluctant to use this consumer from 
spark-packages thinking that it will get less support . Being at 
spark-packages, many does not even consider it using in their use cases rather 
use the whatever Receiver Based model which is documented with Spark. I think 
those who wants to fall back to Receiver based model , Spark out of the box 
Receivers does not give them a better choice and not many customer do not know 
that a better choice exists in spark-packages.


> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of 

[jira] [Updated] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-11045:
--
Affects Version/s: 1.5.1

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.1
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-11045:
--
Description: 
This JIRA is to track the progress of making the Receiver based Low Level Kafka 
Consumer from spark-packages 
(http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
contributed back to Apache Spark Project.

This Kafka consumer has been around for more than year and has matured over the 
time . I see there are many adoptions of this package . I receive positive 
feedbacks that this consumer gives better performance and fault tolerant 
capabilities. 

This is the primary intent of this JIRA to give community a better alternative 
if they want to use Receiver Base model. 

If this consumer make it to Spark Core, it will definitely see more adoption 
and support from community and help many who still prefer the Receiver Based 
model of Kafka Consumer. 

I understand the Direct Stream is the consumer which can give Exact Once 
semantics and uses Kafka Low Level API  , which is good . But Direct Stream has 
concerns around recovering checkpoint on driver code change . Application 
developer need to manage their own offset which complex . Even if some one does 
manages their own offset , it limits the parallelism Spark Streaming can 
achieve. If someone wants more parallelism and want 
spark.streaming.concurrentJobs more than 1 , you can no longer rely on storing 
offset externally as you have no control which batch will run in which 
sequence. 

Furthermore , the Direct Stream has higher latency , as it fetch messages form 
Kafka during RDD action . Also number of RDD partitions are limited to topic 
partition . So unless your Kafka topic does not have enough partitions, you 
have limited parallelism while RDD processing. 

Due to above mentioned concerns , many people who does not want Exactly Once 
semantics , still prefer Receiver based model. Unfortunately, when customer 
fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
Consumer, there are other issues around the reliability of Kafka High Level 
API.  Kafka High Level API is buggy and has serious issue around Consumer 
Re-balance. Hence I do not think this is correct to advice people to use 
KafkaUtil.CreateStream in production . 


The better option presently is there is to use the Consumer from spark-packages 
. It is is using Kafka Low Level Consumer API , store offset in Zookeeper, and 
can recover from any failure . Below are few highlights of this consumer  ..

1. It has a inbuilt PID Controller for dynamic rate limiting.

2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
Rate Limiting is done by controlling number of  messages. The issue with 
throttling by number of message is, if message size various, block size will 
also vary . Let say your Kafka has messages for different sizes from 10KB to 
500 KB. Thus throttling by number of message can never give any deterministic 
size of your block hence there is no guarantee that Memory Back-Pressure can 
really take affect. 

3. This consumer is using Kafka low level API which gives better performance 
than KafkaUtils.createStream based High Level API.

4. This consumer can give end to end no data loss channel if enabled with WAL.


By accepting this low level kafka consumer from spark packages to apache spark 
project , we will give community a better options for Kafka connectivity both 
for Receiver less and Receiver based model. 


  was:
This JIRA is to track the progress of making the Receiver based Low Level Kafka 
Consumer from spark-packages 
(http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
contributed back to Apache Spark Project.

This Kafka consumer has been around for more than year and has matured over the 
time . I see there are many adoptions of this package . I receive positive 
feedbacks that this consumer gives better performance and fault tolerant 
capabilities. 

This is the primary intent of this JIRA to give community a better alternative 
if they want to use Receiver Base model. 

If this consumer make it to Spark Core, it will definitely see more adoption 
and support from community and help many who still prefer the Receiver Based 
model of Kafka Consumer. 

I understand the Direct Stream is the consumer which can give Exact Once 
semantics and uses Kafka Low Level API  , which is good . But Direct Stream has 
issues around recovering checkpoint on driver code change . Application 
developer need to manage their own offset which complex . Even if some one does 
manages their own offset , it limits the parallelism Spark Streaming can 
achieve. If someone wants more parallelism and want 
spark.streaming.concurrentJobs more than 1 , you can no longer rely on storing 
offset 

[jira] [Created] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)
Dibyendu Bhattacharya created SPARK-11045:
-

 Summary: Contributing Receiver based Low Level Kafka Consumer from 
Spark-Packages to Apache Spark Project
 Key: SPARK-11045
 URL: https://issues.apache.org/jira/browse/SPARK-11045
 Project: Spark
  Issue Type: New Feature
  Components: Streaming
Reporter: Dibyendu Bhattacharya


This JIRA is to track the progress of making the Receiver based Low Level Kafka 
Consumer from spark-packages 
(http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
contributed back to Apache Spark Project.

This Kafka consumer has been around for more than year and has matured over the 
time . I see there are many adoptions of this package . I receive positive 
feedbacks that this consumer gives better performance and fault tolerant 
capabilities. 

This is the primary intent of this JIRA to give community a better alternative 
if they want to use Receiver Base model. 

If this consumer make it to Spark Core, it will definitely see more adoption 
and support from community and help many who still prefer the Receiver Based 
model of Kafka Consumer. 

I understand the Direct Stream is the consumer which can give Exact Once 
semantics and uses Kafka Low Level API  , which is good . But Direct Stream has 
issues around recovering checkpoint on driver code change . Application 
developer need to manage their own offset which complex . Even if some one does 
manages their own offset , it limits the parallelism Spark Streaming can 
achieve. If someone wants more parallelism and want 
spark.streaming.concurrentJobs more than 1 , you can no longer rely on storing 
offset externally as you have no control which batch will run in which 
sequence. 

Furthermore , the Direct Stream has higher latency , as it fetch messages form 
Kafka during RDD action . Also number of RDD partitions are limited to topic 
partition . So unless your Kafka topic does not have enough partitions, you 
have limited parallelism while RDD processing. 

Due to above mentioned concerns , many people who does not want Exactly Once 
semantics , still prefer Receiver based model. Unfortunately, when customer 
fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
Consumer, there are other issues around the reliability of Kafka High Level 
API.  Kafka High Level API is buggy and has serious issue around Consumer 
Re-balance. Hence I do not think this is correct to advice people to use 
KafkaUtil.CreateStream in production . 


The better option presently is there is to use the Consumer from spark-packages 
. It is is using Kafka Low Level Consumer API , store offset in Zookeeper, and 
can recover from any failure . Below are few highlights of this consumer  ..

1. It has a inbuilt PID Controller for dynamic rate limiting.

2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
Rate Limiting is done by controlling number of  messages. The issue with 
throttling by number of message is, if message size various, block size will 
also vary . Let say your Kafka has messages for different sizes from 10KB to 
500 KB. Thus throttling by number of message can never give any deterministic 
size of your block hence there is no guarantee that Memory Back-Pressure can 
really take affect. 

3. This consumer is using Kafka low level API which gives better performance 
than KafkaUtils.createStream based High Level API.

4. This consumer can give end to end no data loss channel if enabled with WAL.


By accepting this low level kafka consumer from spark packages to apache spark 
project , we will give community a better options for Kafka connectivity both 
for Receiver less and Receiver based model. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-11045:
--
Affects Version/s: (was: 1.5.1)

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10694) Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon)

2015-09-20 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-10694:
--
Component/s: Block Manager

> Prevent Data Loss in Spark Streaming when used with OFF_HEAP 
> ExternalBlockStore (Tachyon)
> -
>
> Key: SPARK-10694
> URL: https://issues.apache.org/jira/browse/SPARK-10694
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Streaming
>Affects Versions: 1.5.0
>Reporter: Dibyendu Bhattacharya
>
> If Streaming application stores the blocks OFF_HEAP, it may not need any WAL 
> like feature to recover from Driver failure. As long as the writing of blocks 
> to Tachyon from Streaming receiver is durable, it should be recoverable from 
> Tachyon directly on Driver failure. 
> This can solve the issue of expensive WAL write and duplicating the blocks 
> both in MEMORY and also WAL and also guarantee end to end No-Data-Loss 
> channel using OFF_HEAP store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10694) Prevent Data Loss in Spark Streaming when used with OFF_HEAP ExternalBlockStore (Tachyon)

2015-09-18 Thread Dibyendu Bhattacharya (JIRA)
Dibyendu Bhattacharya created SPARK-10694:
-

 Summary: Prevent Data Loss in Spark Streaming when used with 
OFF_HEAP ExternalBlockStore (Tachyon)
 Key: SPARK-10694
 URL: https://issues.apache.org/jira/browse/SPARK-10694
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Dibyendu Bhattacharya


If Streaming application stores the blocks OFF_HEAP, it may not need any WAL 
like feature to recover from Driver failure. As long as the writing of blocks 
to Tachyon from Streaming receiver is durable, it should be recoverable from 
Tachyon directly on Driver failure. 
This can solve the issue of expensive WAL write and duplicating the blocks both 
in MEMORY and also WAL and also guarantee end to end No-Data-Loss channel using 
OFF_HEAP store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-14 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627517#comment-14627517
 ] 

Dibyendu Bhattacharya commented on SPARK-8591:
--

This will be won't fix as suggested by [~tdas] in PR

 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
 failed to unroll the block to memory and store to local is failed, 
 BlockManager still replicate the same block to Remote peer. For Spark 
 Streaming case , the Receivers get the PutResult from local BlockManager and 
 if block failed to store locally , ReceivedBlockHandler throws the 
 SparkException back to Receiver even though the block successfully replicated 
 in Remote peer by BlockManager. This leads to wastage of memory at remote 
 peer as that block can never be used in Streaming jobs. In case of Receiver 
 failed to store the block, it can retry and for every failed retry ( to store 
 to local) may leads to adding another unused block to remote and this may 
 leads to many unwanted blocks in case of high volume receivers does multiple 
 retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never happen that block unroll is successful but 
 store to local memory has failed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-14 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627513#comment-14627513
 ] 

Dibyendu Bhattacharya commented on SPARK-8591:
--

This is the summary of the issue as mentioned in the PR .

Problem summary. If a block fails to unroll, the ReceiverTracker will never 
know about the block and will not include it in a future computation. In the 
mean time, however, the block may be replicated and take up space on other 
executors even though it will never be used.

Implications for Spark core. For Spark core, however, it is reasonable to 
replicate a block even if it fails to unroll. Just because there is not enough 
memory to cache this block on this executor doesn't mean the same is true on a 
different executor. This is all best effort, but a future computation of the 
RDD will still benefit from having the cached block somewhere. (Note: the 
existing code doesn't actually do this for normal RDD caching yet because 
CacheManager has its own unrolling logic. We will address this separately in 
the future.)

Alternative fix. The right fix for SPARK-8591 would be to have the 
ReceiverTracker just read its blocks from the BlockManagerMaster. This 
simplifies the two divergent block reporting code paths. Since the 
BlockManagerMaster is notified of replicated blocks, the replication here will 
also help mitigate data loss in the case of MEMORY_ONLY_*.

TL;DR. This patch removes a small feature from block manager that, though not 
currently used, is desirable in the future for both Spark core and Spark 
streaming. However, the underlying issue is not caused by a bug in the block 
manager, but an incorrect assumption in the ReceiverTracker that doesn't take 
into account replication. The correct way forward would be to fix this in Spark 
streaming by refactoring the ReceiverTracker to depend on BlockManagerMaster.

 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
 failed to unroll the block to memory and store to local is failed, 
 BlockManager still replicate the same block to Remote peer. For Spark 
 Streaming case , the Receivers get the PutResult from local BlockManager and 
 if block failed to store locally , ReceivedBlockHandler throws the 
 SparkException back to Receiver even though the block successfully replicated 
 in Remote peer by BlockManager. This leads to wastage of memory at remote 
 peer as that block can never be used in Streaming jobs. In case of Receiver 
 failed to store the block, it can retry and for every failed retry ( to store 
 to local) may leads to adding another unused block to remote and this may 
 leads to many unwanted blocks in case of high volume receivers does multiple 
 retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never happen that block unroll is successful but 
 store to local memory has failed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-14 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8591:
-
Component/s: (was: Block Manager)

 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
 failed to unroll the block to memory and store to local is failed, 
 BlockManager still replicate the same block to Remote peer. For Spark 
 Streaming case , the Receivers get the PutResult from local BlockManager and 
 if block failed to store locally , ReceivedBlockHandler throws the 
 SparkException back to Receiver even though the block successfully replicated 
 in Remote peer by BlockManager. This leads to wastage of memory at remote 
 peer as that block can never be used in Streaming jobs. In case of Receiver 
 failed to store the block, it can retry and for every failed retry ( to store 
 to local) may leads to adding another unused block to remote and this may 
 leads to many unwanted blocks in case of high volume receivers does multiple 
 retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never happen that block unroll is successful but 
 store to local memory has failed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-14 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8591:
-
Component/s: Streaming

 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
 failed to unroll the block to memory and store to local is failed, 
 BlockManager still replicate the same block to Remote peer. For Spark 
 Streaming case , the Receivers get the PutResult from local BlockManager and 
 if block failed to store locally , ReceivedBlockHandler throws the 
 SparkException back to Receiver even though the block successfully replicated 
 in Remote peer by BlockManager. This leads to wastage of memory at remote 
 peer as that block can never be used in Streaming jobs. In case of Receiver 
 failed to store the block, it can retry and for every failed retry ( to store 
 to local) may leads to adding another unused block to remote and this may 
 leads to many unwanted blocks in case of high volume receivers does multiple 
 retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never happen that block unroll is successful but 
 store to local memory has failed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-04 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8591:
-
Description: 
Block which failed to unroll to memory and returned iterator and size 0, should 
not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and 
BlockStatus is not reported to Master.

Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
failed to unroll the block to memory and store to local is failed, BlockManager 
still replicate the same block to Remote peer. For Spark Streaming case , the 
Receivers get the PutResult from local BlockManager and if block failed to 
store , Receiver ReceivedBlockHandler throws the SparkException back to 
Receiver even though the block successfully replicated in Remote peer by 
BlockManager. This leads to wastage of memory at remote peer as that block can 
never be used in Streaming jobs. In case of Receiver failed to store the block, 
it can retry and for every failed retry ( to store to local) may leads to 
adding another unused block to remote and this may leads to many unwanted 
blocks in case of high volume receivers does multiple retry. 

The fix here proposed is to stop replicating the block if store to local has 
failed. This fix will prevent the scenario mentioned above and also will not 
impact the RDD Partition replications ( during Cache or Persists) as RDD 
CacheManager perform unrolling to memory first before attempting to store in 
local memory, and this can never happen that block unroll is successful but 
store to local memory has failed. 



  was:
Block which failed to unroll to memory and returned iterator and size 0, should 
not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and 
BlockStatus is not reported to Master.

Primary issue here is , for level is MEMORY_ONLY_2 , if BlockManager failed to 
unroll the block to memory and store to local is failed, BlockManager still 
replicate the same block to Remote peer. For Spark Streaming case , the 
Receivers get the PutResult from local BlockManager and if block failed to 
store , Receiver ReceivedBlockHandler throws the SparkException back to 
Receiver even though the block successfully replicated in Remote peer by 
BlockManager. This leads to wastage of memory at remote peer as that block can 
never be used in Streaming jobs. In case of Receiver failed to store the block, 
it can retry and for every failed retry ( to store to local) may leads to 
adding another unused block to remote and this may leads to many unwanted 
blocks in case of high volume receivers does multiple retry. 

The fix here proposed is to stop replicating the block if store to local has 
failed. This fix will prevent the scenario mentioned above and also will not 
impact the RDD Partition replications ( during Cache or Persists) as RDD 
CacheManager perform unrolling to memory first before attempting to store in 
local memory, and this can never happen that block unroll is successful but 
store to local memory has failed. 




 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
 failed to unroll the block to memory and store to local is failed, 
 BlockManager still replicate the same block to Remote peer. For Spark 
 Streaming case , the Receivers get the PutResult from local BlockManager and 
 if block failed to store , Receiver ReceivedBlockHandler throws the 
 SparkException back to Receiver even though the block successfully replicated 
 in Remote peer by BlockManager. This leads to wastage of memory at remote 
 peer as that block can never be used in Streaming jobs. In case of Receiver 
 failed to store the block, it can retry and for every failed retry ( to store 
 to local) may leads to adding another unused block to remote and this may 
 leads to many unwanted blocks in case of high volume receivers does multiple 
 retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never happen 

[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-04 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8591:
-
Description: 
Block which failed to unroll to memory and returned iterator and size 0, should 
not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and 
BlockStatus is not reported to Master.

Primary issue here is , for level is MEMORY_ONLY_2 , if BlockManager failed to 
unroll the block to memory and store to local is failed, BlockManager still 
replicate the same block to Remote peer. For Spark Streaming case , the 
Receivers get the PutResult from local BlockManager and if block failed to 
store , Receiver ReceivedBlockHandler throws the SparkException back to 
Receiver even though the block successfully replicated in Remote peer by 
BlockManager. This leads to wastage of memory at remote peer as that block can 
never be used in Streaming jobs. In case of Receiver failed to store the block, 
it can retry and for every failed retry ( to store to local) may leads to 
adding another unused block to remote and this may leads to many unwanted 
blocks in case of high volume receivers does multiple retry. 

The fix here proposed is to stop replicating the block if store to local has 
failed. This fix will prevent the scenario mentioned above and also will not 
impact the RDD Partition replications ( during Cache or Persists) as RDD 
CacheManager perform unrolling to memory first before attempting to store in 
local memory, and this can never happen that block unroll is successful but 
store to local memory has failed. 



  was:Block which failed to unroll to memory and returned iterator and size 0, 
should not be replicated to peer node as putBlockStatus comes as 
StorageLevel.NONE and BlockStatus is not reported to Master


 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for level is MEMORY_ONLY_2 , if BlockManager failed 
 to unroll the block to memory and store to local is failed, BlockManager 
 still replicate the same block to Remote peer. For Spark Streaming case , the 
 Receivers get the PutResult from local BlockManager and if block failed to 
 store , Receiver ReceivedBlockHandler throws the SparkException back to 
 Receiver even though the block successfully replicated in Remote peer by 
 BlockManager. This leads to wastage of memory at remote peer as that block 
 can never be used in Streaming jobs. In case of Receiver failed to store the 
 block, it can retry and for every failed retry ( to store to local) may leads 
 to adding another unused block to remote and this may leads to many unwanted 
 blocks in case of high volume receivers does multiple retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never happen that block unroll is successful but 
 store to local memory has failed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-07-04 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8591:
-
Description: 
Block which failed to unroll to memory and returned iterator and size 0, should 
not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and 
BlockStatus is not reported to Master.

Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
failed to unroll the block to memory and store to local is failed, BlockManager 
still replicate the same block to Remote peer. For Spark Streaming case , the 
Receivers get the PutResult from local BlockManager and if block failed to 
store locally , ReceivedBlockHandler throws the SparkException back to Receiver 
even though the block successfully replicated in Remote peer by BlockManager. 
This leads to wastage of memory at remote peer as that block can never be used 
in Streaming jobs. In case of Receiver failed to store the block, it can retry 
and for every failed retry ( to store to local) may leads to adding another 
unused block to remote and this may leads to many unwanted blocks in case of 
high volume receivers does multiple retry. 

The fix here proposed is to stop replicating the block if store to local has 
failed. This fix will prevent the scenario mentioned above and also will not 
impact the RDD Partition replications ( during Cache or Persists) as RDD 
CacheManager perform unrolling to memory first before attempting to store in 
local memory, and this can never happen that block unroll is successful but 
store to local memory has failed. 



  was:
Block which failed to unroll to memory and returned iterator and size 0, should 
not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and 
BlockStatus is not reported to Master.

Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
failed to unroll the block to memory and store to local is failed, BlockManager 
still replicate the same block to Remote peer. For Spark Streaming case , the 
Receivers get the PutResult from local BlockManager and if block failed to 
store , Receiver ReceivedBlockHandler throws the SparkException back to 
Receiver even though the block successfully replicated in Remote peer by 
BlockManager. This leads to wastage of memory at remote peer as that block can 
never be used in Streaming jobs. In case of Receiver failed to store the block, 
it can retry and for every failed retry ( to store to local) may leads to 
adding another unused block to remote and this may leads to many unwanted 
blocks in case of high volume receivers does multiple retry. 

The fix here proposed is to stop replicating the block if store to local has 
failed. This fix will prevent the scenario mentioned above and also will not 
impact the RDD Partition replications ( during Cache or Persists) as RDD 
CacheManager perform unrolling to memory first before attempting to store in 
local memory, and this can never happen that block unroll is successful but 
store to local memory has failed. 




 Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 
 StorageLevel
 

 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 Block which failed to unroll to memory and returned iterator and size 0, 
 should not be replicated to peer node as putBlockStatus comes as 
 StorageLevel.NONE and BlockStatus is not reported to Master.
 Primary issue here is , for StorageLevel  MEMORY_ONLY_2 , if BlockManager 
 failed to unroll the block to memory and store to local is failed, 
 BlockManager still replicate the same block to Remote peer. For Spark 
 Streaming case , the Receivers get the PutResult from local BlockManager and 
 if block failed to store locally , ReceivedBlockHandler throws the 
 SparkException back to Receiver even though the block successfully replicated 
 in Remote peer by BlockManager. This leads to wastage of memory at remote 
 peer as that block can never be used in Streaming jobs. In case of Receiver 
 failed to store the block, it can retry and for every failed retry ( to store 
 to local) may leads to adding another unused block to remote and this may 
 leads to many unwanted blocks in case of high volume receivers does multiple 
 retry. 
 The fix here proposed is to stop replicating the block if store to local has 
 failed. This fix will prevent the scenario mentioned above and also will not 
 impact the RDD Partition replications ( during Cache or Persists) as RDD 
 CacheManager perform unrolling to memory first before attempting to store in 
 local memory, and this can never 

[jira] [Created] (SPARK-8591) Block failed to unroll to memory should not be replicated for MEMORY_ONLY_2 StorageLevel

2015-06-24 Thread Dibyendu Bhattacharya (JIRA)
Dibyendu Bhattacharya created SPARK-8591:


 Summary: Block failed to unroll to memory should not be replicated 
for MEMORY_ONLY_2 StorageLevel
 Key: SPARK-8591
 URL: https://issues.apache.org/jira/browse/SPARK-8591
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya


Block which failed to unroll to memory and returned iterator and size 0, should 
not be replicated to peer node as putBlockStatus comes as StorageLevel.NONE and 
BlockStatus is not reported to Master



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593171#comment-14593171
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
--

https://kafka.apache.org/08/configuration.html

it says fetch.message.max.bytes 

The number of byes of messages to attempt to fetch for each topic-partition in 
each fetch request. These bytes will be read into memory for each partition, so 
this helps control the memory used by the consumer. The fetch request size must 
be at least as large as the maximum message size the server allows or else it 
is possible for the producer to send messages larger than the consumer can 
fetch.

This is not per messages , but size of message you fetch in every FetchRequest 
using FetchRequestBuilder

 [STREAMING] Kafka DirectStream API stops receiving messages if collective 
 size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
 exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
 -

 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical

 The issue is , if in Kafka there are variable size messages ranging from few 
 KB to few hundred KBs , setting the rate limiting by number of messages can 
 leads to potential issue.
 Let say size of messages in Kafka are such that for default 
 fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
 pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
 number as say 2000. Now with this settings when Kafka RDD pulls messages for 
 its offset range , it will only pull 1000 messages (limited by size of the 
 pull in SimpleConsumer API) and can never be able to pull messages till the 
 desired untilOffset and in KafkaRDD it failed in this assert call..
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593273#comment-14593273
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
--

I got this problem just once. Not able to reproduce it after that. Here is the 
executor stack trace from that occurrence . Not sure if this problem is related 
to some Kafka issue where Leader Offset comes wrong.


15/06/18 09:01:21 INFO CoarseGrainedExecutorBackend: Registered signal handlers 
for [TERM, HUP, INT]
15/06/18 09:01:21 INFO SecurityManager: Changing view acls to: hadoop
15/06/18 09:01:21 INFO SecurityManager: Changing modify acls to: hadoop
15/06/18 09:01:21 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
15/06/18 09:01:22 INFO Slf4jLogger: Slf4jLogger started
15/06/18 09:01:22 INFO Remoting: Starting remoting
15/06/18 09:01:22 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverPropsFetcher@10.252.5.54:45553]
15/06/18 09:01:22 INFO Utils: Successfully started service 'driverPropsFetcher' 
on port 45553.
15/06/18 09:01:23 INFO SecurityManager: Changing view acls to: hadoop
15/06/18 09:01:23 INFO SecurityManager: Changing modify acls to: hadoop
15/06/18 09:01:23 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
15/06/18 09:01:23 INFO Slf4jLogger: Slf4jLogger started
15/06/18 09:01:23 INFO Remoting: Starting remoting
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
15/06/18 09:01:23 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutor@10.252.5.54:56579]
15/06/18 09:01:23 INFO Utils: Successfully started service 'sparkExecutor' on 
port 56579.
15/06/18 09:01:23 INFO DiskBlockManager: Created local directory at 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/blockmgr-2badd56a-7877-44d7-bb67-c309935ce1ba
15/06/18 09:01:23 INFO MemoryStore: MemoryStore started with capacity 883.8 MB
15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
akka.tcp://sparkDriver@10.252.5.113:52972/user/CoarseGrainedScheduler
15/06/18 09:01:23 INFO WorkerWatcher: Connecting to worker 
akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker
15/06/18 09:01:23 INFO WorkerWatcher: Successfully connected to 
akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker
15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver
15/06/18 09:01:23 INFO Executor: Starting executor ID 1 on host 10.252.5.54
15/06/18 09:01:23 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 34554.
15/06/18 09:01:23 INFO NettyBlockTransferService: Server created on 34554
15/06/18 09:01:23 INFO BlockManagerMaster: Trying to register BlockManager
15/06/18 09:01:24 INFO BlockManagerMaster: Registered BlockManager
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 0
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 1
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 2
15/06/18 09:01:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/18 09:01:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/06/18 09:01:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/06/18 09:01:24 INFO Executor: Fetching 
http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar with 
timestamp 1434618080212
15/06/18 09:01:24 INFO Utils: Fetching 
http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar to 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/fetchFileTemp4240791741464959275.tmp
15/06/18 09:01:25 INFO Utils: Copying 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/19875585461434618080212_cache
 to 
/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar
15/06/18 09:01:25 INFO Executor: Adding 
file:/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar
 to class loader
15/06/18 09:01:25 INFO TorrentBroadcast: Started reading broadcast variable 0
15/06/18 09:01:25 INFO MemoryStore: ensureFreeSpace(1399) called with curMem=0, 
maxMem=926731468
15/06/18 09:01:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 1399.0 B, free 883.8 MB)
15/06/18 09:01:25 INFO TorrentBroadcast: 

[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593285#comment-14593285
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
--

Yes , right ..

May be this is a false alarm ...I did not see any issue with the logic. As I 
see KafkaRDD keep pulling messages of chunk size fetch.message.max.bytes (1 MB) 
in every fetchBatch and it will keep doing till it reach the untilOffset...So I 
may be wrong here. . I got the issue once and after that not able to reproduce 
it . Shared the executor trace from that run , and I can see some 
OffsetOutOfRange issue. Not sure how that come as I launch the receiver very 
first time and starting from earliest offset. 

Just to mention , for all successive run , I never see the output like this as 
the shared log..

15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 
offsets 0 - 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 1 
offsets 0 - 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 2 
offsets 0 - 2338

There must be some problem happened to get the offset ranges which seems to 
wrong I guess.  This topic is very old topic and offset can not start from Zero 
(0)..

 [STREAMING] Kafka DirectStream API stops receiving messages if collective 
 size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
 exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
 -

 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical

 The issue is , if in Kafka there are variable size messages ranging from few 
 KB to few hundred KBs , setting the rate limiting by number of messages can 
 leads to potential issue.
 Let say size of messages in Kafka are such that for default 
 fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
 pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
 number as say 2000. Now with this settings when Kafka RDD pulls messages for 
 its offset range , it will only pull 1000 messages (limited by size of the 
 pull in SimpleConsumer API) and can never be able to pull messages till the 
 desired untilOffset and in KafkaRDD it failed in this assert call..
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default

2015-06-18 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8474:
-
Description: 
The issue is , if in Kafka there are variable size messages ranging from few KB 
to few hundred KBs , setting the rate limiting by number of messages can leads 
to potential issue.

Let say size of messages in Kafka are such that for default 
fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
number as say 2000. Now with this settings when Kafka RDD pulls messages for 
its offset range , it will only pull 1000 messages (limited by size of the pull 
in SimpleConsumer API) and can never be able to pull messages till the desired 
untilOffset and in KafkaRDD it failed in this assert call..

assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



  was:
The issue is , if in Kafka there are variable size messages ranging from few KB 
to few hundred KBs , setting the rate limiting by number of messages can leads 
to potential issue. 

let say size of messages in Kafka are such that for default  
fetch.message.max.bytes limit ONLY 1000 messages can be pulled, whereas I 
specified the spark.streaming.kafka.maxRatePerPartition limit as say 2000. Now 
with this settings when Kafka RDD  pulls messages for its offset range , it 
will only pull 1000 messages and can never be able to pull messages till the 
desired untilOffset and in KafkaRDD it failed in this assert call..

assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))




 [STREAMING] Kafka DirectStream API stops receiving messages if collective 
 size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
 exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
 -

 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical

 The issue is , if in Kafka there are variable size messages ranging from few 
 KB to few hundred KBs , setting the rate limiting by number of messages can 
 leads to potential issue.
 Let say size of messages in Kafka are such that for default 
 fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
 pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
 number as say 2000. Now with this settings when Kafka RDD pulls messages for 
 its offset range , it will only pull 1000 messages (limited by size of the 
 pull in SimpleConsumer API) and can never be able to pull messages till the 
 desired untilOffset and in KafkaRDD it failed in this assert call..
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default

2015-06-18 Thread Dibyendu Bhattacharya (JIRA)
Dibyendu Bhattacharya created SPARK-8474:


 Summary: [STREAMING] Kafka DirectStream API stops receiving 
messages if collective size of the messages specified in 
spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( 
fetch.message.max.bytes) of SimpleConsumer
 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical


The issue is , if in Kafka there are variable size messages ranging from few KB 
to few hundred KBs , setting the rate limiting by number of messages can leads 
to potential issue. 

let say size of messages in Kafka are such that for default  
fetch.message.max.bytes limit ONLY 1000 messages can be pulled, whereas I 
specified the spark.streaming.kafka.maxRatePerPartition limit as say 2000. Now 
with this settings when Kafka RDD  pulls messages for its offset range , it 
will only pull 1000 messages and can never be able to pull messages till the 
desired untilOffset and in KafkaRDD it failed in this assert call..

assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8080) Custom Receiver.store with Iterator type do not give correct count at Spark UI

2015-06-03 Thread Dibyendu Bhattacharya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dibyendu Bhattacharya updated SPARK-8080:
-
Attachment: screenshot.png

 Custom Receiver.store with Iterator type do not give correct count at Spark UI
 --

 Key: SPARK-8080
 URL: https://issues.apache.org/jira/browse/SPARK-8080
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.0
Reporter: Dibyendu Bhattacharya
 Fix For: 1.4.0

 Attachments: screenshot.png


 In Custom receiver if I call store with Iterator type (store(dataIterator: 
 Iterator[T]): Unit ) , Spark UI does not show the correct count of records in 
 block which leads to wrong value for Input Rate, Scheduling Delay and Input 
 SIze. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7525) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon

2015-05-12 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540325#comment-14540325
 ] 

Dibyendu Bhattacharya commented on SPARK-7525:
--

I guess this is something to do with the lack of Tachyon Append Support. 

java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.init(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44)
at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler$$anonfun$5.apply(ReceivedBlockHandler.scala:178)
at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler$$anonfun$5.apply(ReceivedBlockHandler.scala:178)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

 Could not read data from write ahead log record when Receiver failed and WAL 
 is stored in Tachyon
 -

 Key: SPARK-7525
 URL: https://issues.apache.org/jira/browse/SPARK-7525
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
 Environment: AWS , Spark Streaming 1.4 with Tachyon 0.6.4
Reporter: Dibyendu Bhattacharya

 I was testing Fault Tolerant aspect of Spark Streaming when Checkpoint 
 directory is stored in Tachyon. Spark Streaming is able to recover from 
 Driver failure , but when Receiver Failed, Spark Streaming not able read the 
 WAL files written by failed Receiver. Below is exception when Receiver is 
 failed .
 INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1)
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove 
 executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block 
 manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully 
 in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered 
 receiver for stream 2 from 10.252.5.62:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read 
 data from write ahead log record 
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
   at scala.Option.getOrElse(Option.scala:120)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
   at org.apache.spark.scheduler.Task.run(Task.scala:70)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
   at 
 

[jira] [Created] (SPARK-7525) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon

2015-05-11 Thread Dibyendu Bhattacharya (JIRA)
Dibyendu Bhattacharya created SPARK-7525:


 Summary: Could not read data from write ahead log record when 
Receiver failed and WAL is stored in Tachyon
 Key: SPARK-7525
 URL: https://issues.apache.org/jira/browse/SPARK-7525
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
 Environment: AWS , Spark Streaming 1.4 with Tachyon 0.6.4
Reporter: Dibyendu Bhattacharya


I was testing Fault Tolerant aspect of Spark Streaming when Checkpoint 
directory is stored in Tachyon. Spark Streaming is able to recover from Driver 
failure , but when Receiver Failed, Spark Streaming not able read the WAL files 
written by failed Receiver. Below is exception when Receiver is failed .

INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1)
INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove 
executor 2 from BlockManagerMaster.
INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block 
manager BlockManagerId(2, 10.252.5.54, 45789)
INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in 
removeExecutor
INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered 
receiver for stream 2 from 10.252.5.62:47255
WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 
(TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read data 
from write ahead log record 
FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: 
645603894, fileSize = 0
at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
at 
org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
... 15 more

INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 
103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 
(TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not 
read data from write ahead log record 
FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
 [duplicate 1]
INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 
103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor updated: 
app-20150511104442-0048/2 is now LOST (worker lost)
INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - 
Executor app-20150511104442-0048/2 removed: worker lost
ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asked 
to remove non-existent executor 2
INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in stage 103.0 
(TID 423) on executor 10.252.5.62: org.apache.spark.SparkException 

[jira] [Commented] (SPARK-7525) Could not read data from write ahead log record when Receiver failed and WAL is stored in Tachyon

2015-05-11 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537914#comment-14537914
 ] 

Dibyendu Bhattacharya commented on SPARK-7525:
--


This issue not happening when chekpointing to HDFS . 

If Checkpoint directory is Tachyon , then this issue comes while Receiver fails 
. 

For Driver failure case, Spark Streaming can recover if checkpoint directory is 
in Tachyon ..

 Could not read data from write ahead log record when Receiver failed and WAL 
 is stored in Tachyon
 -

 Key: SPARK-7525
 URL: https://issues.apache.org/jira/browse/SPARK-7525
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
 Environment: AWS , Spark Streaming 1.4 with Tachyon 0.6.4
Reporter: Dibyendu Bhattacharya

 I was testing Fault Tolerant aspect of Spark Streaming when Checkpoint 
 directory is stored in Tachyon. Spark Streaming is able to recover from 
 Driver failure , but when Receiver Failed, Spark Streaming not able read the 
 WAL files written by failed Receiver. Below is exception when Receiver is 
 failed .
 INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1)
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove 
 executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block 
 manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully 
 in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered 
 receiver for stream 2 from 10.252.5.62:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read 
 data from write ahead log record 
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
   at scala.Option.getOrElse(Option.scala:120)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
   at org.apache.spark.scheduler.Task.run(Task.scala:70)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: 
 645603894, fileSize = 0
   at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
   at 
 org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
   at 
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
   at 
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
   at 
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
   ... 15 more
 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 
 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 
 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException 
 (Could not read data from write ahead log record 
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
  [duplicate 1]
 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 
 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
 INFO : 

[jira] [Commented] (SPARK-7477) TachyonBlockManager Store Block in TRY_CACHE mode which gives BlockNotFoundException when blocks are evicted from cache

2015-05-08 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534805#comment-14534805
 ] 

Dibyendu Bhattacharya commented on SPARK-7477:
--

I tried Hierarchical Storage on Tachyon ( 
http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems 
to have worked and I did not see any any Spark Job failed due to 
BlockNotFoundException. below is my  Hierarchical Storage settings..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
  -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

 TachyonBlockManager Store Block in TRY_CACHE mode which gives 
 BlockNotFoundException when blocks are evicted from cache
 ---

 Key: SPARK-7477
 URL: https://issues.apache.org/jira/browse/SPARK-7477
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya

 With Spark Streaming on Tachyon as the OFF_HEAP block store 
 I have used the low level Receiver based Kafka consumer 
 (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) for Spark 
 Streaming to pull from Kafka and write Blocks to Tachyon 
 What I see TachyonBlockManager.scala put the blocks in WriteType.TRY_CACHE 
 configuration . And because of this Blocks ate evicted from Tachyon Cache and 
 when Spark try to find the block it throws  BlockNotFoundException . 
 When I modified the WriteType to CACHE_THROUGH , BlockDropException is gone , 
 but it impact the throughput ..



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org