Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
Of course, exactly once receiving is not same as exactly once. In case of
direct kafka stream, the data may actually be pulled multiple time. But
even if the data of a batch is pulled twice because of some failure, the
final result (that is, transformed data accessed through foreachRDD) will
always be the same even if recomputed. In other words, the data in
partition x of the RDD of time t, will always be the same even if that
partition gets recomputed. Now, to get end-to-end exactly once, you will
have also push data out to external data stores in the exactly-once manner
- either the updates are idempotent, or you can use the unique id [(batch
time, partition ID)] to update the store transactionally (such that each
partition is inserted into the data store only once.

This is also explained in my talk. -
https://www.youtube.com/watch?v=d5UJonrruHk

On Tue, Jul 14, 2015 at 8:18 PM, Chen Song  wrote:

> Thanks TD.
>
> As for 1), if timing is not guaranteed, how does exactly once semantics
> supported? It feels like exactly once receiving is not necessarily exactly
> once processing.
>
> Chen
>
> On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das 
> wrote:
>
>>
>>
>> On Tue, Jul 14, 2015 at 6:42 PM, Chen Song 
>> wrote:
>>
>>> Thanks TD and Cody. I saw that.
>>>
>>> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
>>> on HDFS at the end of each batch interval?
>>>
>>
>> The timing is not guaranteed.
>>
>>
>>> 2. In the code, if I first apply transformations and actions on the
>>> directKafkaStream and then use foreachRDD on the original KafkaDStream to
>>> commit offsets myself, will offsets commits always happen after
>>> transformation and action?
>>>
>>> What do you mean by "original KafkaDStream"? if you meant the
>> directKafkaStream? If yes, then yes, output operations like foreachRDD is
>> executed in each batch in the same order as they are defined.
>>
>> dstream1.foreachRDD { rdd => func1(rdd) }
>> dstream2.foreachRDD { rdd => func2(rdd) }
>>
>> In every batch interval, func1 will be executed before func2.
>>
>>
>>
>>
>>> Chen
>>>
>>> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das 
>>> wrote:
>>>
 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
 wrote:

> You have access to the offset ranges for a given rdd in the stream by
> typecasting to HasOffsetRanges.  You can then store the offsets wherever
> you need to.
>
> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
> wrote:
>
>> A follow up question.
>>
>> When using createDirectStream approach, the offsets are checkpointed
>> to HDFS and it is understandable by Spark Streaming job. Is there a way 
>> to
>> expose the offsets via a REST api to end users. Or alternatively, is 
>> there
>> a way to have offsets committed to Kafka Offset Manager so users can 
>> query
>> from a consumer programmatically?
>>
>> Essentially, all I need to do is monitor the progress of data
>> consumption of the Kafka topic.
>>
>>
>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
>> wrote:
>>
>>> You can't use different versions of spark in your application vs
>>> your cluster.
>>>
>>> For the direct stream, it's not 60 partitions per executor, it's 300
>>> partitions, and executors work on them as they are scheduled.  Yes, if 
>>> you
>>> have no messages you will get an empty partition.  It's up to you 
>>> whether
>>> it's worthwhile to call coalesce or not.
>>>
>>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Is this 3 is no of parallel consumer threads per receiver , means
 in total we have 2*3=6 consumer in same consumer group consuming from 
 all
 300 partitions.
 3 is just parallelism on same receiver and recommendation is to use
 1 per receiver since consuming from kafka is not cpu bound rather
 NIC(network bound)  increasing consumer thread on one receiver won't 
 make
 it parallel in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and
 kafka topic has 300 partions , does kafkaRDD created on 5 executors 
 will
 have 60 partitions per executor (total 300 one to one mapping) and if 
 some
 of kafka partitions are empty say offset of last checkpoint to current 
 is
 same for partitons P123, still it will create empty partition in 
 kafkaRDD ?
 So we should call coalesce on kafkaRDD ?


 And is 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD.

As for 1), if timing is not guaranteed, how does exactly once semantics
supported? It feels like exactly once receiving is not necessarily exactly
once processing.

Chen

On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das  wrote:

>
>
> On Tue, Jul 14, 2015 at 6:42 PM, Chen Song  wrote:
>
>> Thanks TD and Cody. I saw that.
>>
>> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
>> on HDFS at the end of each batch interval?
>>
>
> The timing is not guaranteed.
>
>
>> 2. In the code, if I first apply transformations and actions on the
>> directKafkaStream and then use foreachRDD on the original KafkaDStream to
>> commit offsets myself, will offsets commits always happen after
>> transformation and action?
>>
>> What do you mean by "original KafkaDStream"? if you meant the
> directKafkaStream? If yes, then yes, output operations like foreachRDD is
> executed in each batch in the same order as they are defined.
>
> dstream1.foreachRDD { rdd => func1(rdd) }
> dstream2.foreachRDD { rdd => func2(rdd) }
>
> In every batch interval, func1 will be executed before func2.
>
>
>
>
>> Chen
>>
>> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das 
>> wrote:
>>
>>> Relevant documentation -
>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
>>> towards the end.
>>>
>>> directKafkaStream.foreachRDD { rdd =>
>>>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>>>  // offsetRanges.length = # of Kafka partitions being consumed
>>>  ...
>>>  }
>>>
>>>
>>> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
>>> wrote:
>>>
 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
 wrote:

> A follow up question.
>
> When using createDirectStream approach, the offsets are checkpointed
> to HDFS and it is understandable by Spark Streaming job. Is there a way to
> expose the offsets via a REST api to end users. Or alternatively, is there
> a way to have offsets committed to Kafka Offset Manager so users can query
> from a consumer programmatically?
>
> Essentially, all I need to do is monitor the progress of data
> consumption of the Kafka topic.
>
>
> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
> wrote:
>
>> You can't use different versions of spark in your application vs your
>> cluster.
>>
>> For the direct stream, it's not 60 partitions per executor, it's 300
>> partitions, and executors work on them as they are scheduled.  Yes, if 
>> you
>> have no messages you will get an empty partition.  It's up to you whether
>> it's worthwhile to call coalesce or not.
>>
>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is this 3 is no of parallel consumer threads per receiver , means in
>>> total we have 2*3=6 consumer in same consumer group consuming from all 
>>> 300
>>> partitions.
>>> 3 is just parallelism on same receiver and recommendation is to use
>>> 1 per receiver since consuming from kafka is not cpu bound rather
>>> NIC(network bound)  increasing consumer thread on one receiver won't 
>>> make
>>> it parallel in ideal sense ?
>>>
>>> In non receiver based consumer spark 1.3 If I use 5 execuots and
>>> kafka topic has 300 partions , does kafkaRDD created on 5 executors will
>>> have 60 partitions per executor (total 300 one to one mapping) and if 
>>> some
>>> of kafka partitions are empty say offset of last checkpoint to current 
>>> is
>>> same for partitons P123, still it will create empty partition in 
>>> kafkaRDD ?
>>> So we should call coalesce on kafkaRDD ?
>>>
>>>
>>> And is there any incompatibity issue when I include
>>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in 
>>> my
>>> application but my cluster has spark version 1.2 ?
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
 is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one funct

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
On Tue, Jul 14, 2015 at 6:42 PM, Chen Song  wrote:

> Thanks TD and Cody. I saw that.
>
> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
> on HDFS at the end of each batch interval?
>

The timing is not guaranteed.


> 2. In the code, if I first apply transformations and actions on the
> directKafkaStream and then use foreachRDD on the original KafkaDStream to
> commit offsets myself, will offsets commits always happen after
> transformation and action?
>
> What do you mean by "original KafkaDStream"? if you meant the
directKafkaStream? If yes, then yes, output operations like foreachRDD is
executed in each batch in the same order as they are defined.

dstream1.foreachRDD { rdd => func1(rdd) }
dstream2.foreachRDD { rdd => func2(rdd) }

In every batch interval, func1 will be executed before func2.




> Chen
>
> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das 
> wrote:
>
>> Relevant documentation -
>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
>> towards the end.
>>
>> directKafkaStream.foreachRDD { rdd =>
>>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>>  // offsetRanges.length = # of Kafka partitions being consumed
>>  ...
>>  }
>>
>>
>> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
>> wrote:
>>
>>> You have access to the offset ranges for a given rdd in the stream by
>>> typecasting to HasOffsetRanges.  You can then store the offsets wherever
>>> you need to.
>>>
>>> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
>>> wrote:
>>>
 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
 wrote:

> You can't use different versions of spark in your application vs your
> cluster.
>
> For the direct stream, it's not 60 partitions per executor, it's 300
> partitions, and executors work on them as they are scheduled.  Yes, if you
> have no messages you will get an empty partition.  It's up to you whether
> it's worthwhile to call coalesce or not.
>
> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Is this 3 is no of parallel consumer threads per receiver , means in
>> total we have 2*3=6 consumer in same consumer group consuming from all 
>> 300
>> partitions.
>> 3 is just parallelism on same receiver and recommendation is to use 1
>> per receiver since consuming from kafka is not cpu bound rather 
>> NIC(network
>> bound)  increasing consumer thread on one receiver won't make it parallel
>> in ideal sense ?
>>
>> In non receiver based consumer spark 1.3 If I use 5 execuots and
>> kafka topic has 300 partions , does kafkaRDD created on 5 executors will
>> have 60 partitions per executor (total 300 one to one mapping) and if 
>> some
>> of kafka partitions are empty say offset of last checkpoint to current is
>> same for partitons P123, still it will create empty partition in 
>> kafkaRDD ?
>> So we should call coalesce on kafkaRDD ?
>>
>>
>> And is there any incompatibity issue when I include
>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
>> application but my cluster has spark version 1.2 ?
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1. Here you are basically creating 2 receivers and asking each of
>>> them to consume 3 kafka partitions each.
>>>
>>> - In 1.2 we have high level consumers so how can we restrict no of
>>> kafka partitions to consume from? Say I have 300 kafka partitions in 
>>> kafka
>>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
>>> is
>>> it mean I will read from 6 out of 300 partitions only and for rest 294
>>> partitions data is lost?
>>>
>>>
>>> 2.One more doubt in spark streaming how is it decided which part of
>>> main function of driver will run at each batch interval ? Since whole 
>>> code
>>> is written in one function(main function in driver) so how it determined
>>> kafka streams receivers  not to be registered in each batch only 
>>> processing
>>> to be done .
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha 
>>> wrote:
>>>
 Hi

 Let me take ashot at your questions. (I am sure people like Cody
 and TD will co

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD and Cody. I saw that.

1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets on
HDFS at the end of each batch interval?
2. In the code, if I first apply transformations and actions on the
directKafkaStream and then use foreachRDD on the original KafkaDStream to
commit offsets myself, will offsets commits always happen after
transformation and action?

Chen

On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das  wrote:

> Relevant documentation -
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
> towards the end.
>
> directKafkaStream.foreachRDD { rdd =>
>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>  // offsetRanges.length = # of Kafka partitions being consumed
>  ...
>  }
>
>
> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
> wrote:
>
>> You have access to the offset ranges for a given rdd in the stream by
>> typecasting to HasOffsetRanges.  You can then store the offsets wherever
>> you need to.
>>
>> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
>> wrote:
>>
>>> A follow up question.
>>>
>>> When using createDirectStream approach, the offsets are checkpointed to
>>> HDFS and it is understandable by Spark Streaming job. Is there a way to
>>> expose the offsets via a REST api to end users. Or alternatively, is there
>>> a way to have offsets committed to Kafka Offset Manager so users can query
>>> from a consumer programmatically?
>>>
>>> Essentially, all I need to do is monitor the progress of data
>>> consumption of the Kafka topic.
>>>
>>>
>>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
>>> wrote:
>>>
 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Is this 3 is no of parallel consumer threads per receiver , means in
> total we have 2*3=6 consumer in same consumer group consuming from all 300
> partitions.
> 3 is just parallelism on same receiver and recommendation is to use 1
> per receiver since consuming from kafka is not cpu bound rather 
> NIC(network
> bound)  increasing consumer thread on one receiver won't make it parallel
> in ideal sense ?
>
> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
> partitions per executor (total 300 one to one mapping) and if some of 
> kafka
> partitions are empty say offset of last checkpoint to current is same for
> partitons P123, still it will create empty partition in kafkaRDD ? So we
> should call coalesce on kafkaRDD ?
>
>
> And is there any incompatibity issue when I include
> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
> application but my cluster has spark version 1.2 ?
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> 1. Here you are basically creating 2 receivers and asking each of
>> them to consume 3 kafka partitions each.
>>
>> - In 1.2 we have high level consumers so how can we restrict no of
>> kafka partitions to consume from? Say I have 300 kafka partitions in 
>> kafka
>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
>> it mean I will read from 6 out of 300 partitions only and for rest 294
>> partitions data is lost?
>>
>>
>> 2.One more doubt in spark streaming how is it decided which part of
>> main function of driver will run at each batch interval ? Since whole 
>> code
>> is written in one function(main function in driver) so how it determined
>> kafka streams receivers  not to be registered in each batch only 
>> processing
>> to be done .
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha 
>> wrote:
>>
>>> Hi
>>>
>>> Let me take ashot at your questions. (I am sure people like Cody and
>>> TD will correct if I am wrong)
>>>
>>> 0. This is exact copy from the similar question in mail thread from
>>> Akhil D:
>>> Since you set local[4] you will have 4 threads for your computation,
>>> and since you are having 2 receivers, you are left with 2 threads
>>> to process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
>>> means you are having 2 tasks in that stage (with id 0).
>>>
>>> 1. Here you are basically creating 2 receivers and asking each of
>>> them to consume 3 kafka partitions each.
>>> 2. How does that matter? It depends on how m

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
Relevant documentation -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
towards the end.

directKafkaStream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
 // offsetRanges.length = # of Kafka partitions being consumed
 ...
 }


On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger  wrote:

> You have access to the offset ranges for a given rdd in the stream by
> typecasting to HasOffsetRanges.  You can then store the offsets wherever
> you need to.
>
> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song  wrote:
>
>> A follow up question.
>>
>> When using createDirectStream approach, the offsets are checkpointed to
>> HDFS and it is understandable by Spark Streaming job. Is there a way to
>> expose the offsets via a REST api to end users. Or alternatively, is there
>> a way to have offsets committed to Kafka Offset Manager so users can query
>> from a consumer programmatically?
>>
>> Essentially, all I need to do is monitor the progress of data consumption
>> of the Kafka topic.
>>
>>
>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
>> wrote:
>>
>>> You can't use different versions of spark in your application vs your
>>> cluster.
>>>
>>> For the direct stream, it's not 60 partitions per executor, it's 300
>>> partitions, and executors work on them as they are scheduled.  Yes, if you
>>> have no messages you will get an empty partition.  It's up to you whether
>>> it's worthwhile to call coalesce or not.
>>>
>>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> 1. Here you are basically creating 2 receivers and asking each of them
> to consume 3 kafka partitions each.
>
> - In 1.2 we have high level consumers so how can we restrict no of
> kafka partitions to consume from? Say I have 300 kafka partitions in kafka
> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
> it mean I will read from 6 out of 300 partitions only and for rest 294
> partitions data is lost?
>
>
> 2.One more doubt in spark streaming how is it decided which part of
> main function of driver will run at each batch interval ? Since whole code
> is written in one function(main function in driver) so how it determined
> kafka streams receivers  not to be registered in each batch only 
> processing
> to be done .
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha 
> wrote:
>
>> Hi
>>
>> Let me take ashot at your questions. (I am sure people like Cody and
>> TD will correct if I am wrong)
>>
>> 0. This is exact copy from the similar question in mail thread from
>> Akhil D:
>> Since you set local[4] you will have 4 threads for your computation,
>> and since you are having 2 receivers, you are left with 2 threads to
>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
>> means you are having 2 tasks in that stage (with id 0).
>>
>> 1. Here you are basically creating 2 receivers and asking each of
>> them to consume 3 kafka partitions each.
>> 2. How does that matter? It depends on how many receivers you have
>> created to consume that data and if you have repartitioned it. Remember,
>> spark is lazy and executors are relted to the context
>> 3. I think in java, factory method is fixed. You just pass around the
>> contextFactory object. (I love python :) see the signature isso much
>> cleaner :) )
>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>> pointing too.
>>
>> Best
>> Ayan
>>
>>
>>
>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Few

Re: spark streaming with kafka reset offset

2015-07-14 Thread Cody Koeninger
You have access to the offset ranges for a given rdd in the stream by
typecasting to HasOffsetRanges.  You can then store the offsets wherever
you need to.

On Tue, Jul 14, 2015 at 5:00 PM, Chen Song  wrote:

> A follow up question.
>
> When using createDirectStream approach, the offsets are checkpointed to
> HDFS and it is understandable by Spark Streaming job. Is there a way to
> expose the offsets via a REST api to end users. Or alternatively, is there
> a way to have offsets committed to Kafka Offset Manager so users can query
> from a consumer programmatically?
>
> Essentially, all I need to do is monitor the progress of data consumption
> of the Kafka topic.
>
>
> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
> wrote:
>
>> You can't use different versions of spark in your application vs your
>> cluster.
>>
>> For the direct stream, it's not 60 partitions per executor, it's 300
>> partitions, and executors work on them as they are scheduled.  Yes, if you
>> have no messages you will get an empty partition.  It's up to you whether
>> it's worthwhile to call coalesce or not.
>>
>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is this 3 is no of parallel consumer threads per receiver , means in
>>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>>> partitions.
>>> 3 is just parallelism on same receiver and recommendation is to use 1
>>> per receiver since consuming from kafka is not cpu bound rather NIC(network
>>> bound)  increasing consumer thread on one receiver won't make it parallel
>>> in ideal sense ?
>>>
>>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>>> partitions per executor (total 300 one to one mapping) and if some of kafka
>>> partitions are empty say offset of last checkpoint to current is same for
>>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>>> should call coalesce on kafkaRDD ?
>>>
>>>
>>> And is there any incompatibity issue when I include spark-streaming_2.10
>>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
>>> cluster has spark version 1.2 ?
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:

> Hi
>
> Let me take ashot at your questions. (I am sure people like Cody and
> TD will correct if I am wrong)
>
> 0. This is exact copy from the similar question in mail thread from
> Akhil D:
> Since you set local[4] you will have 4 threads for your computation,
> and since you are having 2 receivers, you are left with 2 threads to
> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
> means you are having 2 tasks in that stage (with id 0).
>
> 1. Here you are basically creating 2 receivers and asking each of them
> to consume 3 kafka partitions each.
> 2. How does that matter? It depends on how many receivers you have
> created to consume that data and if you have repartitioned it. Remember,
> spark is lazy and executors are relted to the context
> 3. I think in java, factory method is fixed. You just pass around the
> contextFactory object. (I love python :) see the signature isso much
> cleaner :) )
> 4. Yes, if you use spark checkpointing. You can use yourcustom check
> pointing too.
>
> Best
> Ayan
>
>
>
> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Few doubts :
>>
>> In 1.2 streaming when I use union of streams , my streaming
>> application getting hanged sometimes and nothing gets printed on driver.
>>
>>
>> [Stage 2:>
>>
>> (0 + 2) / 2]
>>  Whats is 0+2/2 here signifies.
>>
>>
>>
>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>> same as numstreams=2 ? in unioned stream ?
>>
>> 2. I launched app

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
A follow up question.

When using createDirectStream approach, the offsets are checkpointed to
HDFS and it is understandable by Spark Streaming job. Is there a way to
expose the offsets via a REST api to end users. Or alternatively, is there
a way to have offsets committed to Kafka Offset Manager so users can query
from a consumer programmatically?

Essentially, all I need to do is monitor the progress of data consumption
of the Kafka topic.


On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger  wrote:

> You can't use different versions of spark in your application vs your
> cluster.
>
> For the direct stream, it's not 60 partitions per executor, it's 300
> partitions, and executors work on them as they are scheduled.  Yes, if you
> have no messages you will get an empty partition.  It's up to you whether
> it's worthwhile to call coalesce or not.
>
> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora  > wrote:
>
>> Is this 3 is no of parallel consumer threads per receiver , means in
>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>> partitions.
>> 3 is just parallelism on same receiver and recommendation is to use 1 per
>> receiver since consuming from kafka is not cpu bound rather NIC(network
>> bound)  increasing consumer thread on one receiver won't make it parallel
>> in ideal sense ?
>>
>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>> partitions per executor (total 300 one to one mapping) and if some of kafka
>> partitions are empty say offset of last checkpoint to current is same for
>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>> should call coalesce on kafkaRDD ?
>>
>>
>> And is there any incompatibity issue when I include spark-streaming_2.10
>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
>> cluster has spark version 1.2 ?
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1. Here you are basically creating 2 receivers and asking each of them
>>> to consume 3 kafka partitions each.
>>>
>>> - In 1.2 we have high level consumers so how can we restrict no of kafka
>>> partitions to consume from? Say I have 300 kafka partitions in kafka topic
>>> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
>>> I will read from 6 out of 300 partitions only and for rest 294 partitions
>>> data is lost?
>>>
>>>
>>> 2.One more doubt in spark streaming how is it decided which part of main
>>> function of driver will run at each batch interval ? Since whole code is
>>> written in one function(main function in driver) so how it determined kafka
>>> streams receivers  not to be registered in each batch only processing to be
>>> done .
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:
>>>
 Hi

 Let me take ashot at your questions. (I am sure people like Cody and TD
 will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads to
 process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means
 you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming
> application getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
> same as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> 

Re: spark streaming with kafka reset offset

2015-06-30 Thread Cody Koeninger
You can't use different versions of spark in your application vs your
cluster.

For the direct stream, it's not 60 partitions per executor, it's 300
partitions, and executors work on them as they are scheduled.  Yes, if you
have no messages you will get an empty partition.  It's up to you whether
it's worthwhile to call coalesce or not.

On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
wrote:

> Is this 3 is no of parallel consumer threads per receiver , means in total
> we have 2*3=6 consumer in same consumer group consuming from all 300
> partitions.
> 3 is just parallelism on same receiver and recommendation is to use 1 per
> receiver since consuming from kafka is not cpu bound rather NIC(network
> bound)  increasing consumer thread on one receiver won't make it parallel
> in ideal sense ?
>
> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
> partitions per executor (total 300 one to one mapping) and if some of kafka
> partitions are empty say offset of last checkpoint to current is same for
> partitons P123, still it will create empty partition in kafkaRDD ? So we
> should call coalesce on kafkaRDD ?
>
>
> And is there any incompatibity issue when I include spark-streaming_2.10
> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
> cluster has spark version 1.2 ?
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora  > wrote:
>
>> 1. Here you are basically creating 2 receivers and asking each of them to
>> consume 3 kafka partitions each.
>>
>> - In 1.2 we have high level consumers so how can we restrict no of kafka
>> partitions to consume from? Say I have 300 kafka partitions in kafka topic
>> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
>> I will read from 6 out of 300 partitions only and for rest 294 partitions
>> data is lost?
>>
>>
>> 2.One more doubt in spark streaming how is it decided which part of main
>> function of driver will run at each batch interval ? Since whole code is
>> written in one function(main function in driver) so how it determined kafka
>> streams receivers  not to be registered in each batch only processing to be
>> done .
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Let me take ashot at your questions. (I am sure people like Cody and TD
>>> will correct if I am wrong)
>>>
>>> 0. This is exact copy from the similar question in mail thread from
>>> Akhil D:
>>> Since you set local[4] you will have 4 threads for your computation, and
>>> since you are having 2 receivers, you are left with 2 threads to
>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means
>>> you are having 2 tasks in that stage (with id 0).
>>>
>>> 1. Here you are basically creating 2 receivers and asking each of them
>>> to consume 3 kafka partitions each.
>>> 2. How does that matter? It depends on how many receivers you have
>>> created to consume that data and if you have repartitioned it. Remember,
>>> spark is lazy and executors are relted to the context
>>> 3. I think in java, factory method is fixed. You just pass around the
>>> contextFactory object. (I love python :) see the signature isso much
>>> cleaner :) )
>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>>> pointing too.
>>>
>>> Best
>>> Ayan
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming application
 getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:>

   (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
 same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 2
 receivers and 5 execuots . As in stream receivers nodes get fixed at start
 of app throughout its lifetime . Does executors gets allicated at start of
 each job on 1s batch interval? If yes, how does its fast to allocate
 resources. I mean if i increase num-executors to 50 , it will negotiate 50
 executors from yarnRM at start of each job so does it takes more time in
 allocating executors than batch interval(here 1s , say if 500ms).? Can i
 fixed processing executors also throughout the app?




 SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
 JavaStreamingContext jssc = new
 JavaStreamingContext(conf,Durations.milliseconds(1000));

 Map kafkaParams = new HashMap();
 kafkaParams.put("zookeeper.connect","ipadd:2181");
 kafkaParams.put("group.id", "testgroup");
 kafkaParams.put("zookeeper.session.timeout.ms", "1");
  Map topicsMap = new HashMap();
 t

Re: spark streaming with kafka reset offset

2015-06-30 Thread Shushant Arora
Is this 3 is no of parallel consumer threads per receiver , means in total
we have 2*3=6 consumer in same consumer group consuming from all 300
partitions.
3 is just parallelism on same receiver and recommendation is to use 1 per
receiver since consuming from kafka is not cpu bound rather NIC(network
bound)  increasing consumer thread on one receiver won't make it parallel
in ideal sense ?

In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
topic has 300 partions , does kafkaRDD created on 5 executors will have 60
partitions per executor (total 300 one to one mapping) and if some of kafka
partitions are empty say offset of last checkpoint to current is same for
partitons P123, still it will create empty partition in kafkaRDD ? So we
should call coalesce on kafkaRDD ?


And is there any incompatibity issue when I include spark-streaming_2.10
(version 1.3) and spark-core_2.10(version 1.3) in my application but my
cluster has spark version 1.2 ?






On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
wrote:

> 1. Here you are basically creating 2 receivers and asking each of them to
> consume 3 kafka partitions each.
>
> - In 1.2 we have high level consumers so how can we restrict no of kafka
> partitions to consume from? Say I have 300 kafka partitions in kafka topic
> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
> I will read from 6 out of 300 partitions only and for rest 294 partitions
> data is lost?
>
>
> 2.One more doubt in spark streaming how is it decided which part of main
> function of driver will run at each batch interval ? Since whole code is
> written in one function(main function in driver) so how it determined kafka
> streams receivers  not to be registered in each batch only processing to be
> done .
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:
>
>> Hi
>>
>> Let me take ashot at your questions. (I am sure people like Cody and TD
>> will correct if I am wrong)
>>
>> 0. This is exact copy from the similar question in mail thread from Akhil
>> D:
>> Since you set local[4] you will have 4 threads for your computation, and
>> since you are having 2 receivers, you are left with 2 threads to process
>> ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means you are
>> having 2 tasks in that stage (with id 0).
>>
>> 1. Here you are basically creating 2 receivers and asking each of them to
>> consume 3 kafka partitions each.
>> 2. How does that matter? It depends on how many receivers you have
>> created to consume that data and if you have repartitioned it. Remember,
>> spark is lazy and executors are relted to the context
>> 3. I think in java, factory method is fixed. You just pass around the
>> contextFactory object. (I love python :) see the signature isso much
>> cleaner :) )
>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>> pointing too.
>>
>> Best
>> Ayan
>>
>>
>>
>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Few doubts :
>>>
>>> In 1.2 streaming when I use union of streams , my streaming application
>>> getting hanged sometimes and nothing gets printed on driver.
>>>
>>>
>>> [Stage 2:>
>>>
>>> (0 + 2) / 2]
>>>  Whats is 0+2/2 here signifies.
>>>
>>>
>>>
>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>>> same as numstreams=2 ? in unioned stream ?
>>>
>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>>> receivers and 5 execuots . As in stream receivers nodes get fixed at start
>>> of app throughout its lifetime . Does executors gets allicated at start of
>>> each job on 1s batch interval? If yes, how does its fast to allocate
>>> resources. I mean if i increase num-executors to 50 , it will negotiate 50
>>> executors from yarnRM at start of each job so does it takes more time in
>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>>> fixed processing executors also throughout the app?
>>>
>>>
>>>
>>>
>>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
>>> JavaStreamingContext jssc = new
>>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>>
>>> Map kafkaParams = new HashMap();
>>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>>> kafkaParams.put("group.id", "testgroup");
>>> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>>>  Map topicsMap = new HashMap();
>>> topicsMap.put("testSparkPartitioned", 3);
>>> int numStreams = 2;
>>> List> kafkaStreams = new
>>> ArrayList>();
>>>   for(int i=0;i>>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>>> kafka.serializer.DefaultDecoder.class,
>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>>> }
>>>  JavaPairDStream directKafkaStream =
>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>>> kafkaStreams.size()));
>>>  JavaDStream lines = directKafkaStream.map(new

Re: spark streaming with kafka reset offset

2015-06-29 Thread Shushant Arora
1. Here you are basically creating 2 receivers and asking each of them to
consume 3 kafka partitions each.

- In 1.2 we have high level consumers so how can we restrict no of kafka
partitions to consume from? Say I have 300 kafka partitions in kafka topic
and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
I will read from 6 out of 300 partitions only and for rest 294 partitions
data is lost?


2.One more doubt in spark streaming how is it decided which part of main
function of driver will run at each batch interval ? Since whole code is
written in one function(main function in driver) so how it determined kafka
streams receivers  not to be registered in each batch only processing to be
done .






On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:

> Hi
>
> Let me take ashot at your questions. (I am sure people like Cody and TD
> will correct if I am wrong)
>
> 0. This is exact copy from the similar question in mail thread from Akhil
> D:
> Since you set local[4] you will have 4 threads for your computation, and
> since you are having 2 receivers, you are left with 2 threads to process (
> (0 + 2) <-- This 2 is your 2 threads.) And the other /2 means you are
> having 2 tasks in that stage (with id 0).
>
> 1. Here you are basically creating 2 receivers and asking each of them to
> consume 3 kafka partitions each.
> 2. How does that matter? It depends on how many receivers you have created
> to consume that data and if you have repartitioned it. Remember, spark is
> lazy and executors are relted to the context
> 3. I think in java, factory method is fixed. You just pass around the
> contextFactory object. (I love python :) see the signature isso much
> cleaner :) )
> 4. Yes, if you use spark checkpointing. You can use yourcustom check
> pointing too.
>
> Best
> Ayan
>
>
>
> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora  > wrote:
>
>> Few doubts :
>>
>> In 1.2 streaming when I use union of streams , my streaming application
>> getting hanged sometimes and nothing gets printed on driver.
>>
>>
>> [Stage 2:>
>>
>> (0 + 2) / 2]
>>  Whats is 0+2/2 here signifies.
>>
>>
>>
>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>> same as numstreams=2 ? in unioned stream ?
>>
>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>> receivers and 5 execuots . As in stream receivers nodes get fixed at start
>> of app throughout its lifetime . Does executors gets allicated at start of
>> each job on 1s batch interval? If yes, how does its fast to allocate
>> resources. I mean if i increase num-executors to 50 , it will negotiate 50
>> executors from yarnRM at start of each job so does it takes more time in
>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>> fixed processing executors also throughout the app?
>>
>>
>>
>>
>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
>> JavaStreamingContext jssc = new
>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>
>> Map kafkaParams = new HashMap();
>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>> kafkaParams.put("group.id", "testgroup");
>> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>>  Map topicsMap = new HashMap();
>> topicsMap.put("testSparkPartitioned", 3);
>> int numStreams = 2;
>> List> kafkaStreams = new
>> ArrayList>();
>>   for(int i=0;i>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>> kafka.serializer.DefaultDecoder.class,
>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>> }
>>  JavaPairDStream directKafkaStream =
>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>> kafkaStreams.size()));
>>  JavaDStream lines = directKafkaStream.map(new
>> Function, String>() {
>>
>> public String call(Tuple2 arg0) throws Exception {
>> ...processing
>> ..return msg;
>> }
>> });
>> lines.print();
>> jssc.start();
>> jssc.awaitTermination();
>>
>>
>>
>>
>> ---
>> 3.For avoiding dataloss when we use checkpointing, and factory method to
>> create sparkConytext, is method name fixed
>> or we can use any name and how to set in app the method name to be used ?
>>
>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
>> zookeeper, is it because of zookeeper is not efficient for high writes and
>> read is not strictly consistent? So
>>
>>  we use simple Kafka API that does not use Zookeeper and offsets tracked
>> only by Spark Streaming within its checkpoints. This eliminates
>> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
>> record is received by Spark Streaming effectively exactly once despite
>> failures.
>>
>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
>> checkoint location ? Means does hdfs be used for sm

Re: spark streaming with kafka reset offset

2015-06-29 Thread ayan guha
Hi

Let me take ashot at your questions. (I am sure people like Cody and TD
will correct if I am wrong)

0. This is exact copy from the similar question in mail thread from Akhil D:
Since you set local[4] you will have 4 threads for your computation, and
since you are having 2 receivers, you are left with 2 threads to process ((0
+ 2) <-- This 2 is your 2 threads.) And the other /2 means you are having 2
tasks in that stage (with id 0).

1. Here you are basically creating 2 receivers and asking each of them to
consume 3 kafka partitions each.
2. How does that matter? It depends on how many receivers you have created
to consume that data and if you have repartitioned it. Remember, spark is
lazy and executors are relted to the context
3. I think in java, factory method is fixed. You just pass around the
contextFactory object. (I love python :) see the signature isso much
cleaner :) )
4. Yes, if you use spark checkpointing. You can use yourcustom check
pointing too.

Best
Ayan



On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming application
> getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
> as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> executors from yarnRM at start of each job so does it takes more time in
> allocating executors than batch interval(here 1s , say if 500ms).? Can i
> fixed processing executors also throughout the app?
>
>
>
>
> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
> JavaStreamingContext jssc = new
> JavaStreamingContext(conf,Durations.milliseconds(1000));
>
> Map kafkaParams = new HashMap();
> kafkaParams.put("zookeeper.connect","ipadd:2181");
> kafkaParams.put("group.id", "testgroup");
> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>  Map topicsMap = new HashMap();
> topicsMap.put("testSparkPartitioned", 3);
> int numStreams = 2;
> List> kafkaStreams = new
> ArrayList>();
>   for(int i=0;i  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> byte[].class,kafka.serializer.DefaultDecoder.class ,
> kafka.serializer.DefaultDecoder.class,
> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
> }
>  JavaPairDStream directKafkaStream =
> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
> kafkaStreams.size()));
>  JavaDStream lines = directKafkaStream.map(new
> Function, String>() {
>
> public String call(Tuple2 arg0) throws Exception {
> ...processing
> ..return msg;
> }
> });
> lines.print();
> jssc.start();
> jssc.awaitTermination();
>
>
>
>
> ---
> 3.For avoiding dataloss when we use checkpointing, and factory method to
> create sparkConytext, is method name fixed
> or we can use any name and how to set in app the method name to be used ?
>
> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
> zookeeper, is it because of zookeeper is not efficient for high writes and
> read is not strictly consistent? So
>
>  we use simple Kafka API that does not use Zookeeper and offsets tracked
> only by Spark Streaming within its checkpoints. This eliminates
> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
> record is received by Spark Streaming effectively exactly once despite
> failures.
>
> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
> checkoint location ? Means does hdfs be used for small data(just offset?)
>
>
>
>
>
>
>
>
>
>
> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> There is another option to try for Receiver Based Low Level Kafka
>> Consumer which is part of Spark-Packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
>> can be used with WAL as well for end to end zero data loss.
>>
>> This is also Reliable Receiver and Commit offset to ZK.  Given the number
>> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
>> Receiver based approach may leads to issues related Consumer Re-balancing
>>  which is a major issue of Kafka High Level API.
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das 
>> wrote:
>>
>>> In the receiver based approach, If the receiver crashes for any reason
>>> (receiver crashed or executor crashed) the receiver sh

Re: spark streaming with kafka reset offset

2015-06-29 Thread Cody Koeninger
3. You need to use your own method, because you need to set up your job.
Read the checkpoint documentation.

4.  Yes, if you want to checkpoint, you need to specify a url to store the
checkpoint at (s3 or hdfs).  Yes, for the direct stream checkpoint it's
just offsets, not all the messages.

On Sun, Jun 28, 2015 at 1:02 PM, Shushant Arora 
wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming application
> getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
> as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> executors from yarnRM at start of each job so does it takes more time in
> allocating executors than batch interval(here 1s , say if 500ms).? Can i
> fixed processing executors also throughout the app?
>
>
>
>
> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
> JavaStreamingContext jssc = new
> JavaStreamingContext(conf,Durations.milliseconds(1000));
>
> Map kafkaParams = new HashMap();
> kafkaParams.put("zookeeper.connect","ipadd:2181");
> kafkaParams.put("group.id", "testgroup");
> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>  Map topicsMap = new HashMap();
> topicsMap.put("testSparkPartitioned", 3);
> int numStreams = 2;
> List> kafkaStreams = new
> ArrayList>();
>   for(int i=0;i  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> byte[].class,kafka.serializer.DefaultDecoder.class ,
> kafka.serializer.DefaultDecoder.class,
> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
> }
>  JavaPairDStream directKafkaStream =
> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
> kafkaStreams.size()));
>  JavaDStream lines = directKafkaStream.map(new
> Function, String>() {
>
> public String call(Tuple2 arg0) throws Exception {
> ...processing
> ..return msg;
> }
> });
> lines.print();
> jssc.start();
> jssc.awaitTermination();
>
>
>
>
> ---
> 3.For avoiding dataloss when we use checkpointing, and factory method to
> create sparkConytext, is method name fixed
> or we can use any name and how to set in app the method name to be used ?
>
> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
> zookeeper, is it because of zookeeper is not efficient for high writes and
> read is not strictly consistent? So
>
>  we use simple Kafka API that does not use Zookeeper and offsets tracked
> only by Spark Streaming within its checkpoints. This eliminates
> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
> record is received by Spark Streaming effectively exactly once despite
> failures.
>
> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
> checkoint location ? Means does hdfs be used for small data(just offset?)
>
>
>
>
>
>
>
>
>
>
> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> There is another option to try for Receiver Based Low Level Kafka
>> Consumer which is part of Spark-Packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
>> can be used with WAL as well for end to end zero data loss.
>>
>> This is also Reliable Receiver and Commit offset to ZK.  Given the number
>> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
>> Receiver based approach may leads to issues related Consumer Re-balancing
>>  which is a major issue of Kafka High Level API.
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das 
>> wrote:
>>
>>> In the receiver based approach, If the receiver crashes for any reason
>>> (receiver crashed or executor crashed) the receiver should get restarted on
>>> another executor and should start reading data from the offset present in
>>> the zookeeper. There is some chance of data loss which can alleviated using
>>> Write Ahead Logs (see streaming programming guide for more details, or see
>>> my talk [Slides PDF
>>> 
>>> , Video
>>> 
>>> ] from last Spark Summit 2015). But that approach can give duplicate
>>> records. The direct approach gives exactly-once guarantees, so you should
>>> try it out.
>>>
>>> TD
>>

Re: spark streaming with kafka reset offset

2015-06-28 Thread Shushant Arora
Few doubts :

In 1.2 streaming when I use union of streams , my streaming application
getting hanged sometimes and nothing gets printed on driver.


[Stage 2:>

  (0 + 2) / 2]
 Whats is 0+2/2 here signifies.



1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
as numstreams=2 ? in unioned stream ?

2. I launched app on yarnRM with num-executors as 5 . It created 2
receivers and 5 execuots . As in stream receivers nodes get fixed at start
of app throughout its lifetime . Does executors gets allicated at start of
each job on 1s batch interval? If yes, how does its fast to allocate
resources. I mean if i increase num-executors to 50 , it will negotiate 50
executors from yarnRM at start of each job so does it takes more time in
allocating executors than batch interval(here 1s , say if 500ms).? Can i
fixed processing executors also throughout the app?




SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
JavaStreamingContext jssc = new
JavaStreamingContext(conf,Durations.milliseconds(1000));

Map kafkaParams = new HashMap();
kafkaParams.put("zookeeper.connect","ipadd:2181");
kafkaParams.put("group.id", "testgroup");
kafkaParams.put("zookeeper.session.timeout.ms", "1");
 Map topicsMap = new HashMap();
topicsMap.put("testSparkPartitioned", 3);
int numStreams = 2;
List> kafkaStreams = new
ArrayList>();
  for(int i=0;i directKafkaStream =
jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
kafkaStreams.size()));
 JavaDStream lines = directKafkaStream.map(new
Function, String>() {

public String call(Tuple2 arg0) throws Exception {
...processing
..return msg;
}
});
lines.print();
jssc.start();
jssc.awaitTermination();



---
3.For avoiding dataloss when we use checkpointing, and factory method to
create sparkConytext, is method name fixed
or we can use any name and how to set in app the method name to be used ?

4.In 1.3 non receiver based streaming, kafka offset is not stored in
zookeeper, is it because of zookeeper is not efficient for high writes and
read is not strictly consistent? So

 we use simple Kafka API that does not use Zookeeper and offsets tracked
only by Spark Streaming within its checkpoints. This eliminates
inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
record is received by Spark Streaming effectively exactly once despite
failures.

So we have to call context.checkpoint(hdfsdir)? Or is it implicit checkoint
location ? Means does hdfs be used for small data(just offset?)










On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi,
>
> There is another option to try for Receiver Based Low Level Kafka Consumer
> which is part of Spark-Packages (
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
> can be used with WAL as well for end to end zero data loss.
>
> This is also Reliable Receiver and Commit offset to ZK.  Given the number
> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
> Receiver based approach may leads to issues related Consumer Re-balancing
>  which is a major issue of Kafka High Level API.
>
> Regards,
> Dibyendu
>
>
>
> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das 
> wrote:
>
>> In the receiver based approach, If the receiver crashes for any reason
>> (receiver crashed or executor crashed) the receiver should get restarted on
>> another executor and should start reading data from the offset present in
>> the zookeeper. There is some chance of data loss which can alleviated using
>> Write Ahead Logs (see streaming programming guide for more details, or see
>> my talk [Slides PDF
>> 
>> , Video
>> 
>> ] from last Spark Summit 2015). But that approach can give duplicate
>> records. The direct approach gives exactly-once guarantees, so you should
>> try it out.
>>
>> TD
>>
>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger 
>> wrote:
>>
>>> Read the spark streaming guide ad the kafka integration guide for a
>>> better understanding of how the receiver based stream works.
>>>
>>> Capacity planning is specific to your environment and what the job is
>>> actually doing, youll need to determine it empirically.
>>>
>>>
>>> On Friday, June 26, 2015, Shushant Arora 
>>> wrote:
>>>
 In 1.2 how to handle offset management after stream application starts
 in each job . I should commit offset after job completion manually?

 And what is recommended no of consumer threads. Say I have 300
 partitions in kafka cluster . Load is ~ 1 million events per second.Each
 event is of ~500bytes. Having 5 receivers with 60 par

Re: spark streaming with kafka reset offset

2015-06-27 Thread Dibyendu Bhattacharya
Hi,

There is another option to try for Receiver Based Low Level Kafka Consumer
which is part of Spark-Packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can
be used with WAL as well for end to end zero data loss.

This is also Reliable Receiver and Commit offset to ZK.  Given the number
of Kafka Partitions you have ( > 100) , using High Level Kafka API for
Receiver based approach may leads to issues related Consumer Re-balancing
 which is a major issue of Kafka High Level API.

Regards,
Dibyendu



On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das  wrote:

> In the receiver based approach, If the receiver crashes for any reason
> (receiver crashed or executor crashed) the receiver should get restarted on
> another executor and should start reading data from the offset present in
> the zookeeper. There is some chance of data loss which can alleviated using
> Write Ahead Logs (see streaming programming guide for more details, or see
> my talk [Slides PDF
> 
> , Video
> 
> ] from last Spark Summit 2015). But that approach can give duplicate
> records. The direct approach gives exactly-once guarantees, so you should
> try it out.
>
> TD
>
> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger 
> wrote:
>
>> Read the spark streaming guide ad the kafka integration guide for a
>> better understanding of how the receiver based stream works.
>>
>> Capacity planning is specific to your environment and what the job is
>> actually doing, youll need to determine it empirically.
>>
>>
>> On Friday, June 26, 2015, Shushant Arora 
>> wrote:
>>
>>> In 1.2 how to handle offset management after stream application starts
>>> in each job . I should commit offset after job completion manually?
>>>
>>> And what is recommended no of consumer threads. Say I have 300
>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each
>>> event is of ~500bytes. Having 5 receivers with 60 partitions each receiver
>>> is sufficient for spark streaming to consume ?
>>>
>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>>> wrote:
>>>
 The receiver-based kafka createStream in spark 1.2 uses zookeeper to
 store offsets.  If you want finer-grained control over offsets, you can
 update the values in zookeeper yourself before starting the job.

 createDirectStream in spark 1.3 is still marked as experimental, and
 subject to change.  That being said, it works better for me in production
 than the receiver based api.

 On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> I am using spark streaming 1.2.
>
> If processing executors get crashed will receiver rest the offset back
> to last processed offset?
>
> If receiver itself got crashed is there a way to reset the offset
> without restarting streaming application other than smallest or largest.
>
>
> Is spark streaming 1.3  which uses low level consumer api, stabe? And
> which is recommended for handling data  loss 1.2 or 1.3 .
>
>
>
>
>
>
>

>>>
>


Re: spark streaming with kafka reset offset

2015-06-27 Thread Tathagata Das
In the receiver based approach, If the receiver crashes for any reason
(receiver crashed or executor crashed) the receiver should get restarted on
another executor and should start reading data from the offset present in
the zookeeper. There is some chance of data loss which can alleviated using
Write Ahead Logs (see streaming programming guide for more details, or see
my talk [Slides PDF

, Video

] from last Spark Summit 2015). But that approach can give duplicate
records. The direct approach gives exactly-once guarantees, so you should
try it out.

TD

On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger  wrote:

> Read the spark streaming guide ad the kafka integration guide for a better
> understanding of how the receiver based stream works.
>
> Capacity planning is specific to your environment and what the job is
> actually doing, youll need to determine it empirically.
>
>
> On Friday, June 26, 2015, Shushant Arora 
> wrote:
>
>> In 1.2 how to handle offset management after stream application starts in
>> each job . I should commit offset after job completion manually?
>>
>> And what is recommended no of consumer threads. Say I have 300 partitions
>> in kafka cluster . Load is ~ 1 million events per second.Each event is of
>> ~500bytes. Having 5 receivers with 60 partitions each receiver is
>> sufficient for spark streaming to consume ?
>>
>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>> wrote:
>>
>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>>> store offsets.  If you want finer-grained control over offsets, you can
>>> update the values in zookeeper yourself before starting the job.
>>>
>>> createDirectStream in spark 1.3 is still marked as experimental, and
>>> subject to change.  That being said, it works better for me in production
>>> than the receiver based api.
>>>
>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 I am using spark streaming 1.2.

 If processing executors get crashed will receiver rest the offset back
 to last processed offset?

 If receiver itself got crashed is there a way to reset the offset
 without restarting streaming application other than smallest or largest.


 Is spark streaming 1.3  which uses low level consumer api, stabe? And
 which is recommended for handling data  loss 1.2 or 1.3 .







>>>
>>


Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
Read the spark streaming guide ad the kafka integration guide for a better
understanding of how the receiver based stream works.

Capacity planning is specific to your environment and what the job is
actually doing, youll need to determine it empirically.


On Friday, June 26, 2015, Shushant Arora  wrote:

> In 1.2 how to handle offset management after stream application starts in
> each job . I should commit offset after job completion manually?
>
> And what is recommended no of consumer threads. Say I have 300 partitions
> in kafka cluster . Load is ~ 1 million events per second.Each event is of
> ~500bytes. Having 5 receivers with 60 partitions each receiver is
> sufficient for spark streaming to consume ?
>
> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger  > wrote:
>
>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>> store offsets.  If you want finer-grained control over offsets, you can
>> update the values in zookeeper yourself before starting the job.
>>
>> createDirectStream in spark 1.3 is still marked as experimental, and
>> subject to change.  That being said, it works better for me in production
>> than the receiver based api.
>>
>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>> shushantaror...@gmail.com
>> > wrote:
>>
>>> I am using spark streaming 1.2.
>>>
>>> If processing executors get crashed will receiver rest the offset back
>>> to last processed offset?
>>>
>>> If receiver itself got crashed is there a way to reset the offset
>>> without restarting streaming application other than smallest or largest.
>>>
>>>
>>> Is spark streaming 1.3  which uses low level consumer api, stabe? And
>>> which is recommended for handling data  loss 1.2 or 1.3 .
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: spark streaming with kafka reset offset

2015-06-26 Thread Shushant Arora
In 1.2 how to handle offset management after stream application starts in
each job . I should commit offset after job completion manually?

And what is recommended no of consumer threads. Say I have 300 partitions
in kafka cluster . Load is ~ 1 million events per second.Each event is of
~500bytes. Having 5 receivers with 60 partitions each receiver is
sufficient for spark streaming to consume ?

On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger  wrote:

> The receiver-based kafka createStream in spark 1.2 uses zookeeper to store
> offsets.  If you want finer-grained control over offsets, you can update
> the values in zookeeper yourself before starting the job.
>
> createDirectStream in spark 1.3 is still marked as experimental, and
> subject to change.  That being said, it works better for me in production
> than the receiver based api.
>
> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora  > wrote:
>
>> I am using spark streaming 1.2.
>>
>> If processing executors get crashed will receiver rest the offset back to
>> last processed offset?
>>
>> If receiver itself got crashed is there a way to reset the offset without
>> restarting streaming application other than smallest or largest.
>>
>>
>> Is spark streaming 1.3  which uses low level consumer api, stabe? And
>> which is recommended for handling data  loss 1.2 or 1.3 .
>>
>>
>>
>>
>>
>>
>>
>


Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
The receiver-based kafka createStream in spark 1.2 uses zookeeper to store
offsets.  If you want finer-grained control over offsets, you can update
the values in zookeeper yourself before starting the job.

createDirectStream in spark 1.3 is still marked as experimental, and
subject to change.  That being said, it works better for me in production
than the receiver based api.

On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora 
wrote:

> I am using spark streaming 1.2.
>
> If processing executors get crashed will receiver rest the offset back to
> last processed offset?
>
> If receiver itself got crashed is there a way to reset the offset without
> restarting streaming application other than smallest or largest.
>
>
> Is spark streaming 1.3  which uses low level consumer api, stabe? And
> which is recommended for handling data  loss 1.2 or 1.3 .
>
>
>
>
>
>
>


spark streaming with kafka reset offset

2015-06-26 Thread Shushant Arora
I am using spark streaming 1.2.

If processing executors get crashed will receiver rest the offset back to
last processed offset?

If receiver itself got crashed is there a way to reset the offset without
restarting streaming application other than smallest or largest.


Is spark streaming 1.3  which uses low level consumer api, stabe? And which
is recommended for handling data  loss 1.2 or 1.3 .