Re: Low Level Kafka Consumer for Spark

2015-01-16 Thread Debasish Das
Hi Dib,

For our usecase I want my spark job1 to read from hdfs/cache and write to
kafka queues. Similarly spark job2 should read from kafka queues and write
to kafka queues.

Is writing to kafka queues from spark job supported in your code ?

Thanks
Deb
 On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

 There was a simple example
 https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
 which you can run after changing few lines of configurations.

 Thanks
 Best Regards

 On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
 any issue with Receiver.Store method . It is able to fetch messages form
 Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and
 let you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2015-01-16 Thread Dibyendu Bhattacharya
My code handles the Kafka Consumer part. But writing to Kafka may not be a
big challenge which you can easily do in your driver code.

dibyendu

On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi Dib,

 For our usecase I want my spark job1 to read from hdfs/cache and write to
 kafka queues. Similarly spark job2 should read from kafka queues and write
 to kafka queues.

 Is writing to kafka queues from spark job supported in your code ?

 Thanks
 Deb
  On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

 There was a simple example
 https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
 which you can run after changing few lines of configurations.

 Thanks
 Best Regards

 On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not
 see any issue with Receiver.Store method . It is able to fetch messages
 form Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and
 let you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread mykidong
Hi Dibyendu,

I am using kafka 0.8.1.1 and spark 1.2.0.
After modifying these version of your pom, I have rebuilt your codes.
But I have not got any messages from ssc.receiverStream(new
KafkaReceiver(_props, i)).

I have found, in your codes, all the messages are retrieved correctly, but
_receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
class's method does not seem to work correctly.

Have you tried running your spark streaming kafka consumer with kafka
0.8.1.1 and spark 1.2.0 ?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

No , I have not tried yet with Spark 1.2 yet. I will try this out and let
you know how this goes.

By the way, is there any change in Receiver Store method happened in Spark
1.2 ?



Regards,
Dibyendu



On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
any issue with Receiver.Store method . It is able to fetch messages form
Kafka.

Can you cross check other configurations in your setup like Kafka broker IP
, topic name, zk host details, consumer id etc.

Dib

On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in Spark
 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Akhil Das
There was a simple example
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
which you can run after changing few lines of configurations.

Thanks
Best Regards

On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
 any issue with Receiver.Store method . It is able to fetch messages form
 Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Dibyendu Bhattacharya
Hi,

Yes, as Jerry mentioned, the Spark -3129 (
https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
which solves the Driver failure problem. The way 3129 is designed , it
solved the driver failure problem agnostic of the source of the stream (
like Kafka or Flume etc) But with just 3129 you can not achieve complete
solution for data loss. You need a reliable receiver which should also
solves the data loss issue on receiver failure.

The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
for which this email thread was started has solved that problem with Kafka
Low Level API.

And Spark-4062 as Jerry mentioned also recently solved the same problem
using Kafka High Level API.

On the Kafka High Level Consumer API approach , I would like to mention
that Kafka 0.8 has some issue as mentioned in this wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
where consumer re-balance sometime fails and that is one of the key reason
Kafka is re-writing consumer API in Kafka 0.9.

I know there are few folks already have faced this re-balancing issues
while using Kafka High Level API , and If you ask my opinion, we at Pearson
are still using the Low Level Consumer as this seems to be more robust and
performant and we have been using this for few months without any issue
..and also I may be little biased :)

Regards,
Dibyendu



On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote:

 Hi Rod,

 The purpose of introducing  WAL mechanism in Spark Streaming as a general
 solution is to make all the receivers be benefit from this mechanism.

 Though as you said, external sources like Kafka have their own checkpoint
 mechanism, instead of storing data in WAL, we can only store metadata to
 WAL, and recover from the last committed offsets. But this requires
 sophisticated design of Kafka receiver with low-level API involved, also we
 need to take care of rebalance and fault tolerance things by ourselves. So
 right now instead of implementing a whole new receiver, we choose to
 implement a simple one, though the performance is not so good, it's much
 easier to understand and maintain.

 The design purpose and implementation of reliable Kafka receiver can be
 found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
 future, to improve the reliable Kafka receiver like what you mentioned is
 on our scheduler.

 Thanks
 Jerry


 -Original Message-
 From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
 Sent: Wednesday, December 3, 2014 5:44 AM
 To: u...@spark.incubator.apache.org
 Subject: Re: Low Level Kafka Consumer for Spark

 Dibyendu,

 Just to make sure I will not be misunderstood - My concerns are referring
 to the Spark upcoming solution and not yours. I would to gather the
 perspective of someone which implemented recovery with Kafka a different
 way.

 Tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Luis Ángel Vicente Sánchez
My main complain about the WAL mechanism in the new reliable kafka receiver
is that you have to enable checkpointing and for some reason, even if
spark.cleaner.ttl is set to a reasonable value, only the metadata is
cleaned periodically. In my tests, using a folder in my filesystem as the
checkpoint folder, the receivedMetaData folder remains almost constant in
size but the receivedData folder is always increasing; the spark.cleaner.ttl
was configured to 300 seconds.

2014-12-03 10:13 GMT+00:00 Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com:

 Hi,

 Yes, as Jerry mentioned, the Spark -3129 (
 https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
 which solves the Driver failure problem. The way 3129 is designed , it
 solved the driver failure problem agnostic of the source of the stream (
 like Kafka or Flume etc) But with just 3129 you can not achieve complete
 solution for data loss. You need a reliable receiver which should also
 solves the data loss issue on receiver failure.

 The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
 for which this email thread was started has solved that problem with Kafka
 Low Level API.

 And Spark-4062 as Jerry mentioned also recently solved the same problem
 using Kafka High Level API.

 On the Kafka High Level Consumer API approach , I would like to mention
 that Kafka 0.8 has some issue as mentioned in this wiki (
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
 where consumer re-balance sometime fails and that is one of the key reason
 Kafka is re-writing consumer API in Kafka 0.9.

 I know there are few folks already have faced this re-balancing issues
 while using Kafka High Level API , and If you ask my opinion, we at Pearson
 are still using the Low Level Consumer as this seems to be more robust and
 performant and we have been using this for few months without any issue
 ..and also I may be little biased :)

 Regards,
 Dibyendu



 On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi Rod,

 The purpose of introducing  WAL mechanism in Spark Streaming as a general
 solution is to make all the receivers be benefit from this mechanism.

 Though as you said, external sources like Kafka have their own checkpoint
 mechanism, instead of storing data in WAL, we can only store metadata to
 WAL, and recover from the last committed offsets. But this requires
 sophisticated design of Kafka receiver with low-level API involved, also we
 need to take care of rebalance and fault tolerance things by ourselves. So
 right now instead of implementing a whole new receiver, we choose to
 implement a simple one, though the performance is not so good, it's much
 easier to understand and maintain.

 The design purpose and implementation of reliable Kafka receiver can be
 found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
 future, to improve the reliable Kafka receiver like what you mentioned is
 on our scheduler.

 Thanks
 Jerry


 -Original Message-
 From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
 Sent: Wednesday, December 3, 2014 5:44 AM
 To: u...@spark.incubator.apache.org
 Subject: Re: Low Level Kafka Consumer for Spark

 Dibyendu,

 Just to make sure I will not be misunderstood - My concerns are referring
 to the Spark upcoming solution and not yours. I would to gather the
 perspective of someone which implemented recovery with Kafka a different
 way.

 Tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2014-12-02 Thread RodrigoB
Hi Dibyendu,What are your thoughts on keeping this solution (or not),
considering that Spark Streaming v1.2 will have built-in recoverability of
the received data?https://issues.apache.org/jira/browse/SPARK-1647I'm
concerned about the complexity of this solution with regards the added
complexity and performance overhead by the writing of big amounts of data
into HDFS on a small batch
interval.https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?pli=1#
http://apache-spark-user-list.1001560.n3.nabble.com/file/n20181/spark_streaming_v.png
I think the whole solution is well designed and thought but I'm afraid if it
does really fit all needs with checkpoint based technologies like Flume or
Kafka, by hiding away the management of the offset from the user code. If
instead of saving received data into HDFS, the ReceiverHandler would be
saving some metadata (such as offset in the case of Kafka) specified by the
custom receiver passed into the StreamingContext, then upon driver restart,
that metadata could be used by the custom receiver to recover the point from
which it should start receiving data once more.Anyone's comments will be
greatly appreciated.Tnks,Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Low Level Kafka Consumer for Spark

2014-12-02 Thread Shao, Saisai
Hi Rod,

The purpose of introducing  WAL mechanism in Spark Streaming as a general 
solution is to make all the receivers be benefit from this mechanism. 

Though as you said, external sources like Kafka have their own checkpoint 
mechanism, instead of storing data in WAL, we can only store metadata to WAL, 
and recover from the last committed offsets. But this requires sophisticated 
design of Kafka receiver with low-level API involved, also we need to take care 
of rebalance and fault tolerance things by ourselves. So right now instead of 
implementing a whole new receiver, we choose to implement a simple one, though 
the performance is not so good, it's much easier to understand and maintain.

The design purpose and implementation of reliable Kafka receiver can be found 
in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to 
improve the reliable Kafka receiver like what you mentioned is on our scheduler.

Thanks
Jerry


-Original Message-
From: RodrigoB [mailto:rodrigo.boav...@aspect.com] 
Sent: Wednesday, December 3, 2014 5:44 AM
To: u...@spark.incubator.apache.org
Subject: Re: Low Level Kafka Consumer for Spark

Dibyendu,

Just to make sure I will not be misunderstood - My concerns are referring to 
the Spark upcoming solution and not yours. I would to gather the perspective of 
someone which implemented recovery with Kafka a different way.

Tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Alon Pe'er
Hi Dibyendu,

Thanks for your great work!

I'm new to Spark Streaming, so I just want to make sure I understand Driver
failure issue correctly.

In my use case, I want to make sure that messages coming in from Kafka are
always broken into the same set of RDDs, meaning that if a set of messages
are assigned to one RDD, and the Driver dies before this RDD is processed,
then once the Driver recovers, the same set of messages are assigned to a
single RDD, instead of arbitrarily repartitioning the messages across
different RDDs.

Does your Receiver guarantee this behavior, until the problem is fixed in
Spark 1.2?

Regards,
Alon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Alon,

No this will not be guarantee that same set of messages will come in same
RDD. This fix just re-play the messages from last processed offset in same
order. Again this is just a interim fix we needed to solve our use case .
If you do not need this message re-play feature, just do not perform the
ack ( Acknowledgement) call in the Driver code. Then the processed messages
will not be written to ZK and hence replay will not happen.

Regards,
Dibyendu

On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Tim Smith
Hi Dibyendu,

I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?

Thanks,

Tim


On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 Hi Alon,

 No this will not be guarantee that same set of messages will come in same
 RDD. This fix just re-play the messages from last processed offset in same
 order. Again this is just a interim fix we needed to solve our use case . If
 you do not need this message re-play feature, just do not perform the ack (
 Acknowledgement) call in the Driver code. Then the processed messages will
 not be written to ZK and hence replay will not happen.

 Regards,
 Dibyendu

 On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
 wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand
 Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Tim,

I have not tried persist the RDD.

Here are some discussion on Rate Limiting Spark Streaming is there in this
thread.

http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html

There is a Pull Request https://github.com/apache/spark/pull/945/files to
fix this Rate Limiting issue at BlockGenerator level.

But while testing with heavy load, this fix did not solve my problem. So I
had to have Rate Limiting built into Kafka Consumer. I will make it
configurable soon.

If this is not done, I can see Block are getting dropped which leads to Job
failure.

I have raised this in another thread ..

https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239.
But have not got any answer yet if this is a bug ( Block getting dropped
and Job failed).



Dib


On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote:

 Hi Dibyendu,

 I am a little confused about the need for rate limiting input from
 kafka. If the stream coming in from kafka has higher message/second
 rate than what a Spark job can process then it should simply build a
 backlog in Spark if the RDDs are cached on disk using persist().
 Right?

 Thanks,

 Tim


 On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  Hi Alon,
 
  No this will not be guarantee that same set of messages will come in same
  RDD. This fix just re-play the messages from last processed offset in
 same
  order. Again this is just a interim fix we needed to solve our use case
 . If
  you do not need this message re-play feature, just do not perform the
 ack (
  Acknowledgement) call in the Driver code. Then the processed messages
 will
  not be written to ZK and hence replay will not happen.
 
  Regards,
  Dibyendu
 
  On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
  wrote:
 
  Hi Dibyendu,
 
  Thanks for your great work!
 
  I'm new to Spark Streaming, so I just want to make sure I understand
  Driver
  failure issue correctly.
 
  In my use case, I want to make sure that messages coming in from Kafka
 are
  always broken into the same set of RDDs, meaning that if a set of
 messages
  are assigned to one RDD, and the Driver dies before this RDD is
 processed,
  then once the Driver recovers, the same set of messages are assigned to
 a
  single RDD, instead of arbitrarily repartitioning the messages across
  different RDDs.
 
  Does your Receiver guarantee this behavior, until the problem is fixed
 in
  Spark 1.2?
 
  Regards,
  Alon
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-09-10 Thread Dibyendu Bhattacharya
Hi ,

The latest changes with Kafka message re-play by manipulating ZK offset
seems to be working fine for us. This gives us some relief till actual
issue is fixed in Spark 1.2 .

I have some question on how Spark process the Received data. The logic I
used is basically to pull messages form individual partitions using
dedicated Receivers, and doing a Union of these Stream . After that I
process this union stream.

Today I wanted to test this consumer with our Internal Kafka cluster which
has around 50 million records, with this huge backlog I found Spark only
running the Receiver task and not running the Processing task (or rather
doing it very slow) . Is this a issue with the Consumer or it is a issue
from Spark side ? Ideally when Receivers durably write data to Store ,
the processing should start in parallel . Why does the processing task need
to wait till the Receiver consumes all 50 million messages. ...Or may be I
am doing something wrong ? I can share the driver log if you want.

in Driver I can see only storage.BlockManagerInfo: Added input... type
messages, but hardly I see scheduler.TaskSetManager: Starting task...
messages.. I see data getting written to target system in very very slow
pace.


Regards,
Dibyendu






On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Tathagata,

 I have managed to implement the logic into the Kafka-Spark consumer to
 recover from Driver failure. This is just a interim fix till actual fix is
 done from Spark side.

 The logic is something like this.

 1. When the Individual Receivers starts for every Topic partition, it
 writes the Kafka messages along with certain meta data in Block Store. This
 meta data contains the details of message offset, partition id, topic name
 and consumer id. You can see this logic in PartitionManager.java  next()
 method.

 2.  In the Driver code ( Consumer.java) , I am creating the union of all
 there individual D-Streams, and processing the data using forEachRDD call.
 In the driver code, I am receiving the RDD which contains the Kafka
 messages along with meta data details. In the driver code, periodically I
 am committing the processed offset of the Kafka message into ZK.

 3. When driver stops, and restart again, the Receiver starts again, and
 this time in PartitionManager.java, I am checking what is the actual
 committed offset for the partition, and what is the actual processed
 offset of the same partition. This logic is in the PartitionManager
 constructor.

 If this is a Receiver restart, and processed offset of less than
 Committed offset, I am started fetching again from Processed offset.
 This may lead to duplicate records, but our system can handle duplicates.

 I have tested with multiple driver kill/stops and I found no data loss in
 Kafka consumer.

 In the Driver code, I have not done any checkpointing yet, will test
 that tomorrow.


 One interesting thing I found, if I do repartition of original stream ,
 I can still see the issue of data loss in this logic. What I believe,
 during re- partitioning Spark might be changing the order of RDDs the way
 it generated from Kafka stream. So during re-partition case, even when I am
 committing processed offset, but as this is not in order I still see issue.
 Not sure if this understanding is correct, but not able to find any other
 explanation.

 But if I do not use repartition this solution works fine.

 I can make this as configurable, so that when actual fix is available ,
 this feature in consumer can be turned off as this is an overhead for the
 consumer . Let me know what you think..

 Regards,
 Dibyendu




 On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3. 

Re: Low Level Kafka Consumer for Spark

2014-09-08 Thread Tim Smith
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't
the right way. You have to /val partedStream = repartition(...)/. Would be
nice to have it fixed in the docs.




On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3. Repartitioning: I am trying to understand the repartition issue. One
 common mistake I have seen is that developers repartition a stream but not
 use the repartitioned stream.

 WRONG:
 inputDstream.repartition(100)
 inputDstream.map(...).count().print()

 RIGHT:
 val repartitionedDStream = inputDStream.repartitoin(100)
 repartitionedDStream.map(...).count().print()

 Not sure if this helps solve the problem that you all the facing. I am
 going to add this to the stremaing programming guide to make sure this
 common mistake is avoided.

 TD




 On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Sorry for little delay . As discussed in this thread, I have modified the
 Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
 code to have dedicated Receiver for every Topic Partition. You can see the
 example howto create Union of these receivers
 in consumer.kafka.client.Consumer.java .

 Thanks to Chris for suggesting this change.

 Regards,
 Dibyendu


 On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com
 wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to 0
 upon incoming state changing events. Let's supposed there is at least one
 event since the last data checkpoint. This will lead to inconsistency
 upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread
 and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by
 itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution
 lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to
 HDFS
 every second could be a big challenge for keeping JVM performance - maybe
 that could be reason why it's not really implemented...assuming it isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 

Re: Low Level Kafka Consumer for Spark

2014-09-07 Thread Dibyendu Bhattacharya
Hi Tathagata,

I have managed to implement the logic into the Kafka-Spark consumer to
recover from Driver failure. This is just a interim fix till actual fix is
done from Spark side.

The logic is something like this.

1. When the Individual Receivers starts for every Topic partition, it
writes the Kafka messages along with certain meta data in Block Store. This
meta data contains the details of message offset, partition id, topic name
and consumer id. You can see this logic in PartitionManager.java  next()
method.

2.  In the Driver code ( Consumer.java) , I am creating the union of all
there individual D-Streams, and processing the data using forEachRDD call.
In the driver code, I am receiving the RDD which contains the Kafka
messages along with meta data details. In the driver code, periodically I
am committing the processed offset of the Kafka message into ZK.

3. When driver stops, and restart again, the Receiver starts again, and
this time in PartitionManager.java, I am checking what is the actual
committed offset for the partition, and what is the actual processed
offset of the same partition. This logic is in the PartitionManager
constructor.

If this is a Receiver restart, and processed offset of less than
Committed offset, I am started fetching again from Processed offset.
This may lead to duplicate records, but our system can handle duplicates.

I have tested with multiple driver kill/stops and I found no data loss in
Kafka consumer.

In the Driver code, I have not done any checkpointing yet, will test that
tomorrow.


One interesting thing I found, if I do repartition of original stream , I
can still see the issue of data loss in this logic. What I believe, during
re- partitioning Spark might be changing the order of RDDs the way it
generated from Kafka stream. So during re-partition case, even when I am
committing processed offset, but as this is not in order I still see issue.
Not sure if this understanding is correct, but not able to find any other
explanation.

But if I do not use repartition this solution works fine.

I can make this as configurable, so that when actual fix is available ,
this feature in consumer can be turned off as this is an overhead for the
consumer . Let me know what you think..

Regards,
Dibyendu




On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3. Repartitioning: I am trying to understand the repartition issue. One
 common mistake I have seen is that developers repartition a stream but not
 use the repartitioned stream.

 WRONG:
 inputDstream.repartition(100)
 inputDstream.map(...).count().print()

 RIGHT:
 val repartitionedDStream = inputDStream.repartitoin(100)
 repartitionedDStream.map(...).count().print()

 Not sure if this helps solve the problem that you all the facing. I am
 going to add this to the stremaing programming guide to make sure this
 common mistake is avoided.

 TD




 On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Sorry for little delay . As discussed in this thread, I have modified the
 Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
 code to have dedicated Receiver for every Topic Partition. You can see the
 example howto create Union of these receivers
 in consumer.kafka.client.Consumer.java .

 Thanks to Chris for suggesting this change.

 Regards,
 Dibyendu


 On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com
 wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 

Re: Low Level Kafka Consumer for Spark

2014-09-03 Thread Dibyendu Bhattacharya
Hi,

Sorry for little delay . As discussed in this thread, I have modified the
Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
code to have dedicated Receiver for every Topic Partition. You can see the
example howto create Union of these receivers
in consumer.kafka.client.Consumer.java .

Thanks to Chris for suggesting this change.

Regards,
Dibyendu


On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to 0
 upon incoming state changing events. Let's supposed there is at least one
 event since the last data checkpoint. This will lead to inconsistency upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to HDFS
 every second could be a big challenge for keeping JVM performance - maybe
 that could be reason why it's not really implemented...assuming it isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-31 Thread RodrigoB
Just a comment on the recovery part. 

Is it correct to say that currently Spark Streaming recovery design does not
consider re-computations (upon metadata lineage recovery) that depend on
blocks of data of the received stream?
https://issues.apache.org/jira/browse/SPARK-1647

Just to illustrate a real use case (mine): 
- We have object states which have a Duration field per state which is
incremented on every batch interval. Also this object state is reset to 0
upon incoming state changing events. Let's supposed there is at least one
event since the last data checkpoint. This will lead to inconsistency upon
driver recovery: The Duration field will get incremented from the data
checkpoint version until the recovery moment, but the state change event
will never be re-processed...so in the end we have the old state with the
wrong Duration value.
To make things worst, let's imagine we're dumping the Duration increases
somewhere...which means we're spreading the problem across our system.
Re-computation awareness is something I've commented on another thread and
rather treat it separately.
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

Re-computations do occur, but the only RDD's that are recovered are the ones
from the data checkpoint. This is what we've seen. Is not enough by itself
to ensure recovery of computed data and this partial recovery leads to
inconsistency in some cases. 

Roger - I share the same question with you - I'm just not sure if the
replicated data really gets persisted on every batch. The execution lineage
is checkpointed, but if we have big chunks of data being consumed to
Receiver node on let's say a second bases then having it persisted to HDFS
every second could be a big challenge for keeping JVM performance - maybe
that could be reason why it's not really implemented...assuming it isn't.

Dibyendu had a great effort with the offset controlling code but the general
state consistent recovery feels to me like another big issue to address.

I plan on having a dive into the Streaming code and try to at least
contribute with some ideas. Some more insight from anyone on the dev team
will be very appreciated.

tnks,
Rod 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Sean Owen
I'm no expert. But as I understand, yes you create multiple streams to
consume multiple partitions in parallel. If they're all in the same
Kafka consumer group, you'll get exactly one copy of the message so
yes if you have 10 consumers and 3 Kafka partitions I believe only 3
will be getting messages.

The parallelism of Spark's processing of the RDDs of those messages is
different. There could be 4 partitions in your RDDs doing the work.
This is the kind of thing you potentially influence with repartition.
That is I believe you can get more tasks processing the messages even
if you are only able to consume messages from the queue with 3-way
parallelism, since the queue has 3 partitions.

On Aug 30, 2014 12:56 AM, Tim Smith secs...@gmail.com wrote:

 Ok, so I did this:
 val kInStreams = (1 to 10).map{_ = 
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
  - 1)) }
 val kInMsg = ssc.union(kInStreams)
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

 This has improved parallelism. Earlier I would only get a Stream 0. Now I 
 have Streams [0-9]. Of course, since the kafka topic has only three 
 partitions, only three of those streams are active but I am seeing more 
 blocks being pulled across the three streams total that what one was doing 
 earlier. Also, four nodes are actively processing tasks (vs only two earlier) 
 now which actually has me confused. If Streams are active only on 3 nodes 
 then how/why did a 4th node get work? If a 4th got work why aren't more nodes 
 getting work?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Roger Hoover
I have this same question.  Isn't there somewhere that the Kafka range
metadata can be saved?  From my naive perspective, it seems like it should
be very similar to HDFS lineage.  The original HDFS blocks are kept
somewhere (in the driver?) so that if an RDD partition is lost, it can be
recomputed.  In this case, all we need is the Kafka topic, partition, and
offset range.

Can someone enlighten us on why two copies of the RDD are needed (or some
other mechanism like a WAL) for fault tolerance when using Kafka but not
when reading from say HDFS?


On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote:

 'this 2-node replication is mainly for failover in case the receiver dies
 while data is in flight.  there's still chance for data loss as there's no
 write ahead log on the hot path, but this is being addressed.'

 Can you comment a little on how this will be addressed, will there be a
 durable WAL?  Is there a JIRA for tracking this effort?

 I am curious without WAL if you can avoid this data loss with explicit
 management of Kafka offsets e.g. don't commit offset unless data is
 replicated to multiple nodes or maybe not until processed.  The incoming
 data will always be durably stored to disk in Kafka so can be replayed in
 failure scenarios to avoid data loss if the offsets are managed properly.




 On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming scales
 - as well as how it handles failover and checkpointing, but we can discuss
 that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and such
 (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if you
 wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area as i hear these questions asked
 a lot in my talks and such.  and i think a clear, crisp story on scaling
 and fault-tolerance will help Spark Streaming's adoption.

 hope that helps!

 -chris




 On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that
 the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we 

Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Tim Smith
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spawned. In 1.0.0, despite settings failed
tasks threshold to 64, my job aborts after 4 receiver task failures.
- Data loss recovery due to a failed receiver task/executor.


 For parallelism, I would expect a single createStream() to intelligently
map a receiver thread somewhere, one for each kafka partition, but in
different JVMs. Also, repartition() does not seem to work as advertised. A
repartition(512) should get nodes other than the receiver nodes to get some
RDDs to process. No?


On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 I have this same question.  Isn't there somewhere that the Kafka range
 metadata can be saved?  From my naive perspective, it seems like it should
 be very similar to HDFS lineage.  The original HDFS blocks are kept
 somewhere (in the driver?) so that if an RDD partition is lost, it can be
 recomputed.  In this case, all we need is the Kafka topic, partition, and
 offset range.

 Can someone enlighten us on why two copies of the RDD are needed (or some
 other mechanism like a WAL) for fault tolerance when using Kafka but not
 when reading from say HDFS?



 On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com
 wrote:

 'this 2-node replication is mainly for failover in case the receiver
 dies while data is in flight.  there's still chance for data loss as
 there's no write ahead log on the hot path, but this is being addressed.'

 Can you comment a little on how this will be addressed, will there be a
 durable WAL?  Is there a JIRA for tracking this effort?

 I am curious without WAL if you can avoid this data loss with explicit
 management of Kafka offsets e.g. don't commit offset unless data is
 replicated to multiple nodes or maybe not until processed.  The incoming
 data will always be durably stored to disk in Kafka so can be replayed in
 failure scenarios to avoid data loss if the offsets are managed properly.




 On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming
 scales - as well as how it handles failover and checkpointing, but we can
 discuss that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and
 such (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if
 you wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area 

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread bharatvenkat
Chris,

I did the Dstream.repartition mentioned in the document on parallelism in
receiving, as well as set spark.default.parallelism and it still uses only
2 nodes in my cluster.  I notice there is another email thread on the same
topic:

http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

My code is in Java and here is what I have:

   JavaPairReceiverInputDStreamString, String messages =

KafkaUtils.createStream(ssc, zkQuorum,
cse-job-play-consumer, kafkaTopicMap);

JavaPairDStreamString, String newMessages =
messages.repartition(partitionSize);// partitionSize=30

JavaDStreamString lines = newMessages.map(new
FunctionTuple2lt;String, String, String() {
...

public String call(Tuple2String, String tuple2) {
  return tuple2._2();
}
  });

JavaDStreamString words = lines.flatMap(new
MetricsComputeFunction()
);

JavaPairDStreamString, Integer wordCounts = words.mapToPair(
new PairFunctionString, String, Integer() {
   ...
}
);

 wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
Void() {...});

Thanks,
Bharat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Jonathan Hodges
'this 2-node replication is mainly for failover in case the receiver dies
while data is in flight.  there's still chance for data loss as there's no
write ahead log on the hot path, but this is being addressed.'

Can you comment a little on how this will be addressed, will there be a
durable WAL?  Is there a JIRA for tracking this effort?

I am curious without WAL if you can avoid this data loss with explicit
management of Kafka offsets e.g. don't commit offset unless data is
replicated to multiple nodes or maybe not until processed.  The incoming
data will always be durably stored to disk in Kafka so can be replayed in
failure scenarios to avoid data loss if the offsets are managed properly.




On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming scales
 - as well as how it handles failover and checkpointing, but we can discuss
 that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and such
 (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if you
 wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area as i hear these questions asked
 a lot in my talks and such.  and i think a clear, crisp story on scaling
 and fault-tolerance will help Spark Streaming's adoption.

 hope that helps!

 -chris




 On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has been fixed.

How do I debug the repartition() statement to see what's the flow
after the job hits that statement?


On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote:
 Chris,

 I did the Dstream.repartition mentioned in the document on parallelism in
 receiving, as well as set spark.default.parallelism and it still uses only
 2 nodes in my cluster.  I notice there is another email thread on the same
 topic:

 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

 My code is in Java and here is what I have:

JavaPairReceiverInputDStreamString, String messages =

 KafkaUtils.createStream(ssc, zkQuorum,
 cse-job-play-consumer, kafkaTopicMap);

 JavaPairDStreamString, String newMessages =
 messages.repartition(partitionSize);// partitionSize=30

 JavaDStreamString lines = newMessages.map(new
 FunctionTuple2lt;String, String, String() {
 ...

 public String call(Tuple2String, String tuple2) {
   return tuple2._2();
 }
   });

 JavaDStreamString words = lines.flatMap(new
 MetricsComputeFunction()
 );

 JavaPairDStreamString, Integer wordCounts = words.mapToPair(
 new PairFunctionString, String, Integer() {
...
 }
 );

  wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
 Void() {...});

 Thanks,
 Bharat



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still uses
 only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Ok, so I did this:
val kInStreams = (1 to 10).map{_ =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a Stream 0. Now I
have Streams [0-9]. Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If Streams are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote:

 I create my DStream very simply as:
 val kInMsg =
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
 - 8))
 .
 .
 eventually, before I operate on the DStream, I repartition it:
 kInMsg.repartition(512)

 Are you saying that ^^ repartition doesn't split by dstream into multiple
 smaller streams? Should I manually create multiple Dstreams like this?:
 val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

 Then I apply some custom logic to it as:
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
 normalizeLog takes a String and Map of regex and returns a string

 In my case, I think I have traced the issue to the receiver executor being
 killed by Yarn:
 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
 node-dn1-4-acme.com: remote Akka client disassociated

 This be the root cause?

 http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
 https://issues.apache.org/jira/browse/SPARK-2121





 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still
 uses only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Re: Low Level Kafka Consumer for Spark

2014-08-28 Thread Chris Fregly
@bharat-

overall, i've noticed a lot of confusion about how Spark Streaming scales -
as well as how it handles failover and checkpointing, but we can discuss
that separately.

there's actually 2 dimensions to scaling here:  receiving and processing.

*Receiving*
receiving can be scaled out by submitting new DStreams/Receivers to the
cluster as i've done in the Kinesis example.  in fact, i purposely chose to
submit multiple receivers in my Kinesis example because i feel it should be
the norm and not the exception - particularly for partitioned and
checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
only way to scale.

a side note here is that each receiver running in the cluster will
immediately replicates to 1 other node for fault-tolerance of that specific
receiver.  this is where the confusion lies.  this 2-node replication is
mainly for failover in case the receiver dies while data is in flight.
 there's still chance for data loss as there's no write ahead log on the
hot path, but this is being addressed.

this in mentioned in the docs here:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

*Processing*
once data is received, tasks are scheduled across the Spark cluster just
like any other non-streaming task where you can specify the number of
partitions for reduces, etc.  this is the part of scaling that is sometimes
overlooked - probably because it works just like regular Spark, but it is
worth highlighting.

Here's a blurb in the docs:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

the other thing that's confusing with Spark Streaming is that in Scala, you
need to explicitly

import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

in order to pick up the implicits that allow DStream.reduceByKey and such
(versus DStream.transform(rddBatch = rddBatch.reduceByKey())

in other words, DStreams appear to be relatively featureless until you
discover this implicit.  otherwise, you need to operate on the underlying
RDD's explicitly which is not ideal.

the Kinesis example referenced earlier in the thread uses the DStream
implicits.


side note to all of this - i've recently convinced my publisher for my
upcoming book, Spark In Action, to let me jump ahead and write the Spark
Streaming chapter ahead of other more well-understood libraries.  early
release is in a month or so.  sign up  @ http://sparkinaction.com if you
wanna get notified.

shameless plug that i wouldn't otherwise do, but i really think it will
help clear a lot of confusion in this area as i hear these questions asked
a lot in my talks and such.  and i think a clear, crisp story on scaling
and fault-tolerance will help Spark Streaming's adoption.

hope that helps!

-chris




On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-27 Thread Bharat Venkat
Hi Dibyendu,

That would be great.  One of the biggest drawback of Kafka utils as well as
your implementation is I am unable to scale out processing.  I am
relatively new to Spark and Spark Streaming - from what I read and what I
observe with my deployment is that having the RDD created on one receiver
is processed by at most 2 nodes in my cluster (most likely because default
replication is 2 and spark schedules processing close to where the data
is).  I tried rdd.replicate() to no avail.

Would Chris and your proposal to have union of DStreams for all these
Receivers still allow scaling out subsequent processing?

Thanks,
Bharat


On Tue, Aug 26, 2014 at 10:59 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:


 Thanks Chris and Bharat for your inputs. I agree, running multiple
 receivers/dstreams is desirable for scalability and fault tolerant. and
 this is easily doable. In present KafkaReceiver I am creating as many
 threads for each kafka topic partitions, but I can definitely create
 multiple KafkaReceivers for every partition. As Chris mentioned , in this
 case I need to then have union of DStreams for all these Receivers. I will
 try this out and let you know.

 Dib


 On Wed, Aug 27, 2014 at 9:10 AM, Chris Fregly ch...@fregly.com wrote:

 great work, Dibyendu.  looks like this would be a popular contribution.

 expanding on bharat's question a bit:

 what happens if you submit multiple receivers to the cluster by creating
 and unioning multiple DStreams as in the kinesis example here:


 https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123

 for more context, the kinesis implementation above uses the Kinesis
 Client Library (KCL) to automatically assign - and load balance - stream
 shards among all KCL threads from all receivers (potentially coming and
 going as nodes die) on all executors/nodes using DynamoDB as the
 association data store.

 ZooKeeper would be used for your Kafka consumers, of course.  and
 ZooKeeper watches to handle the ephemeral nodes.  and I see you're using
 Curator, which makes things easier.

 as bharat suggested, running multiple receivers/dstreams may be desirable
 from a scalability and fault tolerance standpoint.  is this type of load
 balancing possible among your different Kafka consumers running in
 different ephemeral JVMs?

 and isn't it fun proposing a popular piece of code?  the question
 floodgates have opened!  haha. :)

  -chris



 On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Bharat,

 Thanks for your email. If the Kafka Reader worker process dies, it
 will be replaced by different machine, and it will start consuming from the
 offset where it left over ( for each partition). Same case can happen even
 if I tried to have individual Receiver for every partition.

 Regards,
 Dibyendu


 On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:

 I like this consumer for what it promises - better control over offset
 and
 recovery from failures.  If I understand this right, it still uses
 single
 worker process to read from Kafka (one thread per partition) - is there
 a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine
 hosting the
 Kafka Reader worker process dies and is replaced by a different
 machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2014-08-27 Thread Dibyendu Bhattacharya
I agree. This issue should be fixed in Spark rather rely on replay of Kafka
messages.

Dib
On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi,

As I understand, your problem is similar to this JIRA.

https://issues.apache.org/jira/browse/SPARK-1647

The issue in this case, Kafka can not replay the message as offsets are
already committed. Also I think existing KafkaUtils ( The Default High
Level Kafka Consumer) also have this issue.

Similar discussion is there in this thread also...

http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html

As I am thinking, it is possible to tackle this in the consumer code I have
written. If we can store the topic partition_id and consumed offset in ZK
after every checkpoint , then after Spark recover from the fail over, the
present PartitionManager code can start reading from last checkpointed
offset ( instead last committed offset as it is doing now) ..In that case
it can replay the data since last checkpoint.

I will think over it ..

Regards,
Dibyendu



On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB rodrigo.boav...@aspect.com
wrote:

 Hi Dibyendu,

 My colleague has taken a look at the spark kafka consumer github you have
 provided and started experimenting.

 We found that somehow when Spark has a failure after a data checkpoint, the
 expected re-computations correspondent to the metadata checkpoints are not
 recovered so we loose Kafka messages and RDD's computations in Spark.
 The impression is that this code is replacing quite a bit of Spark Kafka
 Streaming code where maybe (not sure) metadata checkpoints are done every
 batch interval.

 Was it on purpose to solely depend on the Kafka commit to recover data and
 recomputations between data checkpoints? If so, how to make this work?

 tnks
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi Bharat,

Thanks for your email. If the Kafka Reader worker process dies, it will
be replaced by different machine, and it will start consuming from the
offset where it left over ( for each partition). Same case can happen even
if I tried to have individual Receiver for every partition.

Regards,
Dibyendu


On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
wrote:

 I like this consumer for what it promises - better control over offset and
 recovery from failures.  If I understand this right, it still uses single
 worker process to read from Kafka (one thread per partition) - is there a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine hosting
 the
 Kafka Reader worker process dies and is replaced by a different machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Chris Fregly
great work, Dibyendu.  looks like this would be a popular contribution.

expanding on bharat's question a bit:

what happens if you submit multiple receivers to the cluster by creating
and unioning multiple DStreams as in the kinesis example here:

https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123

for more context, the kinesis implementation above uses the Kinesis Client
Library (KCL) to automatically assign - and load balance - stream shards
among all KCL threads from all receivers (potentially coming and going as
nodes die) on all executors/nodes using DynamoDB as the association data
store.

ZooKeeper would be used for your Kafka consumers, of course.  and ZooKeeper
watches to handle the ephemeral nodes.  and I see you're using Curator,
which makes things easier.

as bharat suggested, running multiple receivers/dstreams may be desirable
from a scalability and fault tolerance standpoint.  is this type of load
balancing possible among your different Kafka consumers running in
different ephemeral JVMs?

and isn't it fun proposing a popular piece of code?  the question
floodgates have opened!  haha. :)

-chris



On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Bharat,

 Thanks for your email. If the Kafka Reader worker process dies, it will
 be replaced by different machine, and it will start consuming from the
 offset where it left over ( for each partition). Same case can happen even
 if I tried to have individual Receiver for every partition.

 Regards,
 Dibyendu


 On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:

 I like this consumer for what it promises - better control over offset and
 recovery from failures.  If I understand this right, it still uses single
 worker process to read from Kafka (one thread per partition) - is there a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine hosting
 the
 Kafka Reader worker process dies and is replaced by a different machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2014-08-25 Thread RodrigoB
Hi Dibyendu,

My colleague has taken a look at the spark kafka consumer github you have
provided and started experimenting.

We found that somehow when Spark has a failure after a data checkpoint, the
expected re-computations correspondent to the metadata checkpoints are not
recovered so we loose Kafka messages and RDD's computations in Spark. 
The impression is that this code is replacing quite a bit of Spark Kafka
Streaming code where maybe (not sure) metadata checkpoints are done every
batch interval.

Was it on purpose to solely depend on the Kafka commit to recover data and
recomputations between data checkpoints? If so, how to make this work?

tnks
Rod 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-25 Thread bharatvenkat
I like this consumer for what it promises - better control over offset and
recovery from failures.  If I understand this right, it still uses single
worker process to read from Kafka (one thread per partition) - is there a
way to specify multiple worker processes (on different machines) to read
from Kafka?  Maybe one worker process for each partition?

If there is no such option, what happens when the single machine hosting the
Kafka Reader worker process dies and is replaced by a different machine
(like in cloud)?

Thanks,
Bharat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-05 Thread Dibyendu Bhattacharya
Thanks Jonathan,

Yes, till non-ZK based offset management is available in Kafka, I need to
maintain the offset in ZK. And yes, both cases explicit commit is
necessary. I modified the Low Level Kafka Spark Consumer little bit to have
Receiver spawns threads for every partition of the topic and perform the
'store' operation in multiple threads. It would be good if the
receiver.store methods are made thread safe..which is not now presently .

Waiting for TD's comment on this Kafka Spark Low Level consumer.


Regards,
Dibyendu



On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote:

 Hi Yan,

 That is a good suggestion.  I believe non-Zookeeper offset management will
 be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
 September.


 https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

 That should make this fairly easy to implement, but it will still require
 explicit offset commits to avoid data loss which is different than the
 current KafkaUtils implementation.

 Jonathan





 On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote:

 Another suggestion that may help is that, you can consider use Kafka to
 store the latest offset instead of Zookeeper. There are at least two
 benefits: 1) lower the workload of ZK 2) support replay from certain
 offset. This is how Samza http://samza.incubator.apache.org/ deals
 with the Kafka offset, the doc is here
 http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html
  .
 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'll let TD chime on on this one, but I'm guessing this would be a
 welcome addition. It's great to see community effort on adding new
 streams/receivers, adding a Java API for receivers was something we did
 specifically to allow this :)

 - Patrick


 On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 I have implemented a Low Level Kafka Consumer for Spark Streaming using
 Kafka Simple Consumer API. This API will give better control over the Kafka
 offset management and recovery from failures. As the present Spark
 KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
 control over the offset management which is not possible in Kafka HighLevel
 consumer.

 This Project is available in below Repo :

 https://github.com/dibbhatt/kafka-spark-consumer


 I have implemented a Custom Receiver
 consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
 Consumer API (implemented in consumer.kafka packages) to fetch messages
 from Kafka and 'store' it in Spark.

 The logic will detect number of partitions for a topic and spawn that
 many threads (Individual instances of Consumers). Kafka Consumer uses
 Zookeeper for storing the latest offset for individual partitions, which
 will help to recover in case of failure. The Kafka Consumer logic is
 tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
 failures,  recovery from offset errors and other fail-over aspects.

 The consumer.kafka.client.Consumer is the sample Consumer which uses
 this Kafka Receivers to generate DStreams from Kafka and apply a Output
 operation for every messages of the RDD.

 We are planning to use this Kafka Spark Consumer to perform Near Real
 Time Indexing of Kafka Messages to target Search Cluster and also Near Real
 Time Aggregation using target NoSQL storage.

 Kindly let me know your view. Also if this looks good, can I contribute
 to Spark Streaming project.

 Regards,
 Dibyendu







Re: Low Level Kafka Consumer for Spark

2014-08-04 Thread Jonathan Hodges
Hi Yan,

That is a good suggestion.  I believe non-Zookeeper offset management will
be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
September.

https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

That should make this fairly easy to implement, but it will still require
explicit offset commits to avoid data loss which is different than the
current KafkaUtils implementation.

Jonathan





On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote:

 Another suggestion that may help is that, you can consider use Kafka to
 store the latest offset instead of Zookeeper. There are at least two
 benefits: 1) lower the workload of ZK 2) support replay from certain
 offset. This is how Samza http://samza.incubator.apache.org/ deals with
 the Kafka offset, the doc is here
 http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html
  .
 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'll let TD chime on on this one, but I'm guessing this would be a
 welcome addition. It's great to see community effort on adding new
 streams/receivers, adding a Java API for receivers was something we did
 specifically to allow this :)

 - Patrick


 On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 I have implemented a Low Level Kafka Consumer for Spark Streaming using
 Kafka Simple Consumer API. This API will give better control over the Kafka
 offset management and recovery from failures. As the present Spark
 KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
 control over the offset management which is not possible in Kafka HighLevel
 consumer.

 This Project is available in below Repo :

 https://github.com/dibbhatt/kafka-spark-consumer


 I have implemented a Custom Receiver
 consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
 Consumer API (implemented in consumer.kafka packages) to fetch messages
 from Kafka and 'store' it in Spark.

 The logic will detect number of partitions for a topic and spawn that
 many threads (Individual instances of Consumers). Kafka Consumer uses
 Zookeeper for storing the latest offset for individual partitions, which
 will help to recover in case of failure. The Kafka Consumer logic is
 tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
 failures,  recovery from offset errors and other fail-over aspects.

 The consumer.kafka.client.Consumer is the sample Consumer which uses
 this Kafka Receivers to generate DStreams from Kafka and apply a Output
 operation for every messages of the RDD.

 We are planning to use this Kafka Spark Consumer to perform Near Real
 Time Indexing of Kafka Messages to target Search Cluster and also Near Real
 Time Aggregation using target NoSQL storage.

 Kindly let me know your view. Also if this looks good, can I contribute
 to Spark Streaming project.

 Regards,
 Dibyendu