Re: FW: Kafka Direct Stream - dynamic topic subscription

2017-10-29 Thread Cody Koeninger
As it says in SPARK-10320 and in the docs at
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#consumerstrategies
, you can use SubscribePattern

On Sun, Oct 29, 2017 at 3:56 PM, Ramanan, Buvana (Nokia - US/Murray
Hill) <buvana.rama...@nokia-bell-labs.com> wrote:
> Hello Cody,
>
>
>
> As the stake holders of JIRA SPARK-10320 issue, can you please explain the
> purpose of dynamic topic subscription? Does it mean adapting the consumer to
> read from the new partitions that might get created after the SparkStreaming
> job begins? Is there a succinct writeup on the dynamic topic subscription
> feature that you can share?
>
>
>
> Also, is there  a way I can subscribe to topics whose name matches a regular
> expression (some Kafka consumers such as kafka-python python library support
> that)?
>
>
>
> I forward the email I sent to spark users group that contains a little more
> background on my question.
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>
>
> From: Ramanan, Buvana (Nokia - US/Murray Hill)
> [mailto:buvana.rama...@nokia-bell-labs.com]
> Sent: Friday, October 27, 2017 10:46 PM
> To: user@spark.apache.org
> Subject: Kafka Direct Stream - dynamic topic subscription
>
>
>
> Hello,
>
>
>
> Using Spark 2.2.0. Interested in seeing the action of dynamic topic
> subscription.
>
>
>
> Tried this example: streaming.DirectKafkaWordCount (which uses
> org.apache.spark.streaming.kafka010)
>
>
>
> I start with 8 Kafka partitions in my topic and found that Spark Streaming
> executes 8 tasks (one per partition), which is what is expected. While this
> example process was going on, I increased the Kafka partitions to 16 and
> started producing data to the new partitions as well.
>
>
>
> I expected that the Kafka consumer that Spark uses, would detect this change
> and spawn new tasks for the new partitions. But I find that it only reads
> from the old partitions and does not read from new partitions. When I do a
> restart, it reads from all 16 partitions.
>
>
>
> Is this expected?
>
>
>
> What is meant by dynamic topic subscription?
>
>
>
> Does it apply only to topics with a name that matches a regular expression
> and it does not apply to dynamically growing partitions?
>
>
>
> Thanks,
>
> Buvana
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka Direct Stream - dynamic topic subscription

2017-10-27 Thread Ramanan, Buvana (Nokia - US/Murray Hill)
Hello,

Using Spark 2.2.0. Interested in seeing the action of dynamic topic 
subscription.

Tried this example: streaming.DirectKafkaWordCount (which uses 
org.apache.spark.streaming.kafka010)

I start with 8 Kafka partitions in my topic and found that Spark Streaming 
executes 8 tasks (one per partition), which is what is expected. While this 
example process was going on, I increased the Kafka partitions to 16 and 
started producing data to the new partitions as well.

I expected that the Kafka consumer that Spark uses, would detect this change 
and spawn new tasks for the new partitions. But I find that it only reads from 
the old partitions and does not read from new partitions. When I do a restart, 
it reads from all 16 partitions.

Is this expected?

What is meant by dynamic topic subscription?

Does it apply only to topics with a name that matches a regular expression and 
it does not apply to dynamically growing partitions?

Thanks,
Buvana



Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-19 Thread HARSH TAKKAR
Thanks Cody,

It worked for me buy keeping num executor with each having 1 core = num of
partitions of kafka.



On Mon, Sep 18, 2017 at 8:47 PM Cody Koeninger  wrote:

> Have you searched in jira, e.g.
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR 
> wrote:
> > Hi
> >
> > Changing spark version if my last resort, is there any other workaround
> for
> > this problem.
> >
> >
> > On Mon, Sep 18, 2017 at 11:43 AM pandees waran 
> wrote:
> >>
> >> All, May I know what exactly changed in 2.1.1 which solved this problem?
> >>
> >> Sent from my iPhone
> >>
> >> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
> >> wrote:
> >>
> >> Hi,
> >>
> >> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
> >> solved my issue. Can you try with 2.1.1 as well and report back?
> >>
> >> Best,
> >> Anastasios
> >>
> >> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
> >>
> >>
> >> Hi
> >>
> >> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
> >> partitions of each rdd in a dStream formed using KafkaUtils, i am
> getting
> >> the below exception, please suggest a fix.
> >>
> >> I have following config
> >>
> >> kafka :
> >> enable.auto.commit:"true",
> >> auto.commit.interval.ms:"1000",
> >> session.timeout.ms:"3",
> >>
> >> Spark:
> >>
> >> spark.streaming.backpressure.enabled=true
> >>
> >> spark.streaming.kafka.maxRatePerPartition=200
> >>
> >>
> >> Exception in task 0.2 in stage 3236.0 (TID 77795)
> >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access
> >>
> >> --
> >> Kind Regards
> >> Harsh
> >>
> >>
> >
>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Cody Koeninger
Have you searched in jira, e.g.

https://issues.apache.org/jira/browse/SPARK-19185

On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR  wrote:
> Hi
>
> Changing spark version if my last resort, is there any other workaround for
> this problem.
>
>
> On Mon, Sep 18, 2017 at 11:43 AM pandees waran  wrote:
>>
>> All, May I know what exactly changed in 2.1.1 which solved this problem?
>>
>> Sent from my iPhone
>>
>> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
>> wrote:
>>
>> Hi,
>>
>> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
>> solved my issue. Can you try with 2.1.1 as well and report back?
>>
>> Best,
>> Anastasios
>>
>> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
>>
>>
>> Hi
>>
>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>> the below exception, please suggest a fix.
>>
>> I have following config
>>
>> kafka :
>> enable.auto.commit:"true",
>> auto.commit.interval.ms:"1000",
>> session.timeout.ms:"3",
>>
>> Spark:
>>
>> spark.streaming.backpressure.enabled=true
>>
>> spark.streaming.kafka.maxRatePerPartition=200
>>
>>
>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>>
>> --
>> Kind Regards
>> Harsh
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread HARSH TAKKAR
Hi

Changing spark version if my last resort, is there any other workaround for
this problem.


On Mon, Sep 18, 2017 at 11:43 AM pandees waran  wrote:

> All, May I know what exactly changed in 2.1.1 which solved this problem?
>
> Sent from my iPhone
>
> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
> wrote:
>
> Hi,
>
> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
> solved my issue. Can you try with 2.1.1 as well and report back?
>
> Best,
> Anastasios
>
> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
>
>
> Hi
>
> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
> the below exception, please suggest a fix.
>
> I have following config
>
> kafka :
> enable.auto.commit:"true",
> auto.commit.interval.ms:"1000",
> session.timeout.ms:"3",
>
> Spark:
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.kafka.maxRatePerPartition=200
>
>
> Exception in task 0.2 in stage 3236.0 (TID 77795)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
>
> --
> Kind Regards
> Harsh
>
>
>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread pandees waran
All, May I know what exactly changed in 2.1.1 which solved this problem?

Sent from my iPhone

> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias  wrote:
> 
> Hi,
> 
> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 
> solved my issue. Can you try with 2.1.1 as well and report back?
> 
> Best,
> Anastasios
> 
> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
> 
> Hi 
> 
> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the 
> partitions of each rdd in a dStream formed using KafkaUtils, i am getting the 
> below exception, please suggest a fix.
> 
> I have following config 
> 
> kafka : 
> enable.auto.commit:"true",
> auto.commit.interval.ms:"1000",
> session.timeout.ms:"3",
> 
> Spark: 
> spark.streaming.backpressure.enabled=true
> 
> spark.streaming.kafka.maxRatePerPartition=200
> 
> 
> 
> Exception in task 0.2 in stage 3236.0 (TID 77795)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> 
> --
> Kind Regards
> Harsh 
> 


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Anastasios Zouzias
Hi,

I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
solved my issue. Can you try with 2.1.1 as well and report back?

Best,
Anastasios

Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :


Hi

I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
partitions of each rdd in a dStream formed using KafkaUtils, i am getting
the below exception, please suggest a fix.

I have following config

kafka :
enable.auto.commit:"true",
auto.commit.interval.ms:"1000",
session.timeout.ms:"3",

Spark:

spark.streaming.backpressure.enabled=true

spark.streaming.kafka.maxRatePerPartition=200


Exception in task 0.2 in stage 3236.0 (TID 77795)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access

--
Kind Regards
Harsh


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread kant kodali
You should paste some code. ConcurrentModificationException normally
happens when you modify a list or any non-thread safe data structure while
you are iterating over it.

On Sun, Sep 17, 2017 at 10:25 PM, HARSH TAKKAR 
wrote:

> Hi,
>
> No we are not creating any thread for kafka DStream
> however, we have a single thread for refreshing a resource cache on
> driver, but that is totally separate to this connection.
>
> On Mon, Sep 18, 2017 at 12:29 AM kant kodali  wrote:
>
>> Are you creating threads in your application?
>>
>> On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR 
>> wrote:
>>
>>>
>>> Hi
>>>
>>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>>> the below exception, please suggest a fix.
>>>
>>> I have following config
>>>
>>> kafka :
>>> enable.auto.commit:"true",
>>> auto.commit.interval.ms:"1000",
>>> session.timeout.ms:"3",
>>>
>>> Spark:
>>>
>>> spark.streaming.backpressure.enabled=true
>>>
>>> spark.streaming.kafka.maxRatePerPartition=200
>>>
>>>
>>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>> for multi-threaded access
>>>
>>> --
>>> Kind Regards
>>> Harsh
>>>
>>
>>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi,

No we are not creating any thread for kafka DStream
however, we have a single thread for refreshing a resource cache on driver,
but that is totally separate to this connection.

On Mon, Sep 18, 2017 at 12:29 AM kant kodali  wrote:

> Are you creating threads in your application?
>
> On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR 
> wrote:
>
>>
>> Hi
>>
>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>> the below exception, please suggest a fix.
>>
>> I have following config
>>
>> kafka :
>> enable.auto.commit:"true",
>> auto.commit.interval.ms:"1000",
>> session.timeout.ms:"3",
>>
>> Spark:
>>
>> spark.streaming.backpressure.enabled=true
>>
>> spark.streaming.kafka.maxRatePerPartition=200
>>
>>
>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>>
>> --
>> Kind Regards
>> Harsh
>>
>
>


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread kant kodali
Are you creating threads in your application?

On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR  wrote:

>
> Hi
>
> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
> the below exception, please suggest a fix.
>
> I have following config
>
> kafka :
> enable.auto.commit:"true",
> auto.commit.interval.ms:"1000",
> session.timeout.ms:"3",
>
> Spark:
>
> spark.streaming.backpressure.enabled=true
>
> spark.streaming.kafka.maxRatePerPartition=200
>
>
> Exception in task 0.2 in stage 3236.0 (TID 77795)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
>
> --
> Kind Regards
> Harsh
>


ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi

I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
partitions of each rdd in a dStream formed using KafkaUtils, i am getting
the below exception, please suggest a fix.

I have following config

kafka :
enable.auto.commit:"true",
auto.commit.interval.ms:"1000",
session.timeout.ms:"3",

Spark:

spark.streaming.backpressure.enabled=true

spark.streaming.kafka.maxRatePerPartition=200


Exception in task 0.2 in stage 3236.0 (TID 77795)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access

--
Kind Regards
Harsh


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
In either case, end to end exactly once guarantee can only be ensured only
if the output sink is updated transactionally. The engine has to re execute
data on failure. Exactly once guarantee means that the external storage is
updated as if each data record was computed exactly once. That's why you
need to update them transactionally to handle possible recomputations.

This is true for both spark streaming and structured streaming. Hope this
helps.

On Jun 6, 2017 5:56 AM, "ALunar Beach" <alunarbe...@gmail.com> wrote:

> Thanks TD.
> In pre-structured streaming, exactly once guarantee on input is not
> guaranteed. is it?
>
> On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> This is the expected behavior. There are some confusing corner cases.
>> If you are starting to play with Spark Streaming, i highly recommend
>> learning Structured Streaming
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
>> instead.
>>
>> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
>> wrote:
>>
>>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>>> It uses a 30 sec batch duration and normally the job is successful in
>>> 15-20 sec.
>>>
>>> If the spark application fails after the successful completion
>>> (149668428ms in the log below) and restarts, it's duplicating the last
>>> batch again.
>>>
>>> Is this the expected behavior? I was expecting this to start a new batch
>>> window.
>>>
>>>
>>> Here are some logs:
>>>
>>> Last successful run:
>>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>>> 149668428 ms (execution: 0.029 s)
>>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>>> 149668428 ms to writer queue
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>>> 149668428 ms saved to file
>>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>>> took 4032 bytes and 9 ms*
>>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>>> 149668428 ms
>>>
>>> After the restart,
>>>
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>>> 149668428 ms [(my_test,0,2000,2000)]
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms*
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>>> batches): *
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
>>> batches): *149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>>> 149668428 ms.0 from job set of time 149668428 ms
>>>
>>>
>>>
>>> --
>>> View this message in context: Fwd: Spark Streaming Checkpoint and
>>> Exactly Once Guarantee on Kafka Direct Stream
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
Thanks TD.
In pre-structured streaming, exactly once guarantee on input is not
guaranteed. is it?

On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> This is the expected behavior. There are some confusing corner cases.
> If you are starting to play with Spark Streaming, i highly recommend
> learning Structured Streaming
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
> instead.
>
> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
> wrote:
>
>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>> It uses a 30 sec batch duration and normally the job is successful in
>> 15-20 sec.
>>
>> If the spark application fails after the successful completion
>> (149668428ms in the log below) and restarts, it's duplicating the last
>> batch again.
>>
>> Is this the expected behavior? I was expecting this to start a new batch
>> window.
>>
>>
>> Here are some logs:
>>
>> Last successful run:
>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>> 149668428 ms (execution: 0.029 s)
>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>> 149668428 ms to writer queue
>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>> 149668428 ms saved to file
>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>> took 4032 bytes and 9 ms*
>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>> 149668428 ms
>>
>> After the restart,
>>
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>> 149668428 ms [(my_test,0,2000,2000)]
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>> 149668449 ms, 149668452 ms, 149668455 ms*
>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>> batches): *
>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches):
>> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms,
>> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms,
>> 1496684520000 ms, 149668455 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>> 149668428 ms.0 from job set of time 149668428 ms
>>
>>
>>
>> --
>> View this message in context: Fwd: Spark Streaming Checkpoint and
>> Exactly Once Guarantee on Kafka Direct Stream
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
This is the expected behavior. There are some confusing corner cases.
If you are starting to play with Spark Streaming, i highly recommend
learning Structured Streaming
<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
instead.

On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
wrote:

> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
> It uses a 30 sec batch duration and normally the job is successful in
> 15-20 sec.
>
> If the spark application fails after the successful completion
> (149668428ms in the log below) and restarts, it's duplicating the last
> batch again.
>
> Is this the expected behavior? I was expecting this to start a new batch
> window.
>
>
> Here are some logs:
>
> Last successful run:
> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
> 149668428 ms (execution: 0.029 s)
> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
> 149668428 ms to writer queue
> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
> aProjects/Spark2Example/ckpt/checkpoint-149668428'
> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
> 149668428 ms saved to file
> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
> took 4032 bytes and 9 ms*
> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
> 149668428 ms
>
> After the restart,
>
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
> 149668428 ms [(my_test,0,2000,2000)]
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
> batches): 149668428 ms, 149668431 ms, 149668434 ms,
> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
> 149668449 ms, 149668452 ms, 149668455 ms*
> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
> batches): *
> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): 
> *149668428
> ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
> 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
> 149668455 ms
> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
> 149668428 ms.0 from job set of time 149668428 ms
>
>
>
> --
> View this message in context: Fwd: Spark Streaming Checkpoint and Exactly
> Once Guarantee on Kafka Direct Stream
> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread anbucheeralan
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file 'file:/Users/anbucheeralan/
IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO DirectKafkaInputDStream$
DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms


Re: Spark 2 Kafka Direct Stream Consumer Issue

2017-05-24 Thread Jayadeep J
Could any of the experts kindly advise ?

On Fri, May 19, 2017 at 6:00 PM, Jayadeep J <jayade...@gmail.com> wrote:

> Hi ,
>
> I would appreciate some advice regarding an issue we are facing in
> Streaming Kafka Direct Consumer.
>
> We have recently upgraded our application with Kafka Direct Stream to
> Spark 2 (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0)
>  . We find abnormal delays after the application has run for a couple of
> hours & completed consumption of a ~ 10 million records. There is a sudden
> dip in the processing time for ~15 seconds (usual for our app) to ~3
> minutes & from then on the processing time keeps degrading throughout
> without any failure though.
>
> We have seen that the delay is due to certain tasks taking the exact time
> duration of the configured 'request.timeout.ms'  for the Kafka consumer.
> We have tested this by varying timeout property to different values. Looks
> like the get(offset: Long, timeout: Long): ConsumerRecord[K, V]  &
> subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually
> timing out on some of the partitions without reading the data. But the
> executor logs it as successfully completed after the exact timeout
> duration. Note that most other tasks are completing successfully with
> millisecond duration.  We found the DEBUG logs to contain
>  "org.apache.kafka.common.errors.DisconnectException" without any actual
> failure. The Kafka issue logged as 'KafkaConsumer susceptible to
> FetchResponse starvation' [KAFKA-4753] seems to be the underlying cause.
>
> Could anyone kindly suggest if this a normal behaviour for
> spark? Shouldn't Spark throw Timeout error or may be fail the tasks in such
> cases ?? Currently the tasks seems to be successful & the job appears to
> progress with really slow speed.  Thanks for your help.
>
> Thanks
> Jay
>


Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
DStream checkpoints have all kinds of other difficulties, biggest one
being you can't use a checkpoint if your app code has been updated.
If you can avoid checkpoints in general, I would.

On Fri, Oct 21, 2016 at 11:17 AM, Erwan ALLAIN  wrote:
> Thanks for the fast answer !
>
> I just feel annoyed and frustrated not to be able to use spark checkpointing
> because I believe that there mechanism has been correctly tested.
> I'm afraid that reinventing the wheel can lead to side effects that I don't
> see now ...
>
> Anyway thanks again, I know what I have to do :)
>
> On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger  wrote:
>>
>> 0.  If your processing time is regularly greater than your batch
>> interval you're going to have problems anyway.  Investigate this more,
>> set maxRatePerPartition, something.
>> 1. That's personally what I tend to do.
>> 2. Why are you relying on checkpoints if you're storing offset state
>> in the database?  Just restart from the offsets in the database.  I
>> think your solution of map of batchtime to offset ranges would work
>> fine in that case, no?  (make sure to expire items from the map)
>>
>>
>>
>> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN 
>> wrote:
>> > Hi,
>> >
>> > I'm currently implementing an exactly once mechanism based on the
>> > following
>> > example:
>> >
>> >
>> > https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
>> >
>> > the pseudo code is as follow:
>> >
>> > dstream.transform (store offset in a variable on driver side )
>> > dstream.map
>> > dstream.foreachRdd( action + save offset in db)
>> >
>> > this code doesn't work if the processing time is greater than batch
>> > interval
>> > (same problem as windowed
>> >
>> > (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala)
>> >
>> > Indeed, at each batch interval a new rdd is created and stacked, thus
>> > method
>> > transform is called several times and update the global variable and at
>> > last
>> > when we perform saving the offset range does not correspond to the one
>> > processed.
>> >
>> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in
>> > the
>> > first example) instead of dstream ?
>> >
>> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in
>> > case
>> > of crash this map will not reflect anymore the generatedRdds (restored
>> > from
>> > checkpoint, RDD prepared but not executed)
>> >   2.1 ) Do I need to store this map elsewhere (cassandra) ?
>> >   2.2)  Is there a way to retrieve offset range restored ? (transform
>> > method
>> > is not called anymore for the checkpointed rdd)
>> >   2.3) Is possible to store some context along the RDD to be serialized
>> > ?
>> >
>> > Lots of questions, let me kow if it's not clear !
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Thanks for the fast answer !

I just feel annoyed and frustrated not to be able to use spark
checkpointing because I believe that there mechanism has been correctly
tested.
I'm afraid that reinventing the wheel can lead to side effects that I don't
see now ...

Anyway thanks again, I know what I have to do :)

On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger  wrote:

> 0.  If your processing time is regularly greater than your batch
> interval you're going to have problems anyway.  Investigate this more,
> set maxRatePerPartition, something.
> 1. That's personally what I tend to do.
> 2. Why are you relying on checkpoints if you're storing offset state
> in the database?  Just restart from the offsets in the database.  I
> think your solution of map of batchtime to offset ranges would work
> fine in that case, no?  (make sure to expire items from the map)
>
>
>
> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN 
> wrote:
> > Hi,
> >
> > I'm currently implementing an exactly once mechanism based on the
> following
> > example:
> >
> > https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/TransactionalPerBatch.scala
> >
> > the pseudo code is as follow:
> >
> > dstream.transform (store offset in a variable on driver side )
> > dstream.map
> > dstream.foreachRdd( action + save offset in db)
> >
> > this code doesn't work if the processing time is greater than batch
> interval
> > (same problem as windowed
> > (https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/Windowed.scala)
> >
> > Indeed, at each batch interval a new rdd is created and stacked, thus
> method
> > transform is called several times and update the global variable and at
> last
> > when we perform saving the offset range does not correspond to the one
> > processed.
> >
> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in
> the
> > first example) instead of dstream ?
> >
> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in
> case
> > of crash this map will not reflect anymore the generatedRdds (restored
> from
> > checkpoint, RDD prepared but not executed)
> >   2.1 ) Do I need to store this map elsewhere (cassandra) ?
> >   2.2)  Is there a way to retrieve offset range restored ? (transform
> method
> > is not called anymore for the checkpointed rdd)
> >   2.3) Is possible to store some context along the RDD to be serialized ?
> >
> > Lots of questions, let me kow if it's not clear !
> >
>


Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
0.  If your processing time is regularly greater than your batch
interval you're going to have problems anyway.  Investigate this more,
set maxRatePerPartition, something.
1. That's personally what I tend to do.
2. Why are you relying on checkpoints if you're storing offset state
in the database?  Just restart from the offsets in the database.  I
think your solution of map of batchtime to offset ranges would work
fine in that case, no?  (make sure to expire items from the map)



On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN  wrote:
> Hi,
>
> I'm currently implementing an exactly once mechanism based on the following
> example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
>
> the pseudo code is as follow:
>
> dstream.transform (store offset in a variable on driver side )
> dstream.map
> dstream.foreachRdd( action + save offset in db)
>
> this code doesn't work if the processing time is greater than batch interval
> (same problem as windowed
> (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala)
>
> Indeed, at each batch interval a new rdd is created and stacked, thus method
> transform is called several times and update the global variable and at last
> when we perform saving the offset range does not correspond to the one
> processed.
>
> 1) Do I need to work at the RDD level (inside a big forEachRDD like in the
> first example) instead of dstream ?
>
> 2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
> of crash this map will not reflect anymore the generatedRdds (restored from
> checkpoint, RDD prepared but not executed)
>   2.1 ) Do I need to store this map elsewhere (cassandra) ?
>   2.2)  Is there a way to retrieve offset range restored ? (transform method
> is not called anymore for the checkpointed rdd)
>   2.3) Is possible to store some context along the RDD to be serialized ?
>
> Lots of questions, let me kow if it's not clear !
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Hi,

I'm currently implementing an exactly once mechanism based on the following
example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

the pseudo code is as follow:

dstream.transform (store offset in a variable on driver side )
dstream.map
dstream.foreachRdd( action + save offset in db)

this code doesn't work if the processing time is greater than batch
interval (same problem as windowed (
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala
)

Indeed, at each batch interval a new rdd is created and stacked, thus
method transform is called several times and update the global variable and
at last when we perform saving the offset range does not correspond to the
one processed.

1) Do I need to work at the RDD level (inside a big forEachRDD like in the
first example) instead of dstream ?

2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
of crash this map will not reflect anymore the generatedRdds (restored from
checkpoint, RDD prepared but not executed)
  2.1 ) Do I need to store this map elsewhere (cassandra) ?
  2.2)  Is there a way to retrieve offset range restored ? (transform
method is not called anymore for the checkpointed rdd)
  2.3) Is possible to store some context along the RDD to be serialized ?

Lots of questions, let me kow if it's not clear !


Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Cody Koeninger
Yeah, to be clear, I'm talking about having only one constructor for a
direct stream, that will give you a stream of ConsumerRecord.

Different needs for topic subscription, starting offsets, etc could be
handled by calling appropriate methods after construction but before
starting the stream.


On Wed, Mar 9, 2016 at 1:19 PM, Alan Braithwaite  wrote:
> I'd probably prefer to keep it the way it is, unless it's becoming more like
> the function without the messageHandler argument.
>
> Right now I have code like this, but I wish it were more similar looking:
>
> if (parsed.partitions.isEmpty()) {
>   JavaPairInputDStream kvstream = KafkaUtils
>   .createDirectStream(jssc, String.class, MessageWrapper.class,
> StringDecoder.class,
>   MessageDecoder.class, kafkaArgs(parsed), topicSet);
>   requests = kvstream.map((Function,
> MessageWrapper>) Tuple2::_2);
> } else {
>   requests = KafkaUtils.createDirectStream(jssc, String.class,
>   MessageWrapper.class, StringDecoder.class, MessageDecoder.class,
> MessageWrapper.class,
>   kafkaArgs(parsed), parsed.partitions,
> (Function,
>   MessageWrapper>) MessageAndMetadata::message);
> }
>
> Of course, this is in the Java API so it may not have relevance to what
> you're talking about.
>
> Perhaps if both functions (the one with partitions arg and the one without)
> returned just ConsumerRecord, I would like that more.
>
> - Alan
>
> On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger  wrote:
>>
>> No, looks like you'd have to catch them in the serializer and have the
>> serializer return option or something. The new consumer builds a buffer full
>> of records, not one at a time.
>>
>> On Mar 8, 2016 4:43 AM, "Marius Soutier"  wrote:
>>>
>>>
>>> > On 04.03.2016, at 22:39, Cody Koeninger  wrote:
>>> >
>>> > The only other valid use of messageHandler that I can think of is
>>> > catching serialization problems on a per-message basis.  But with the
>>> > new Kafka consumer library, that doesn't seem feasible anyway, and
>>> > could be handled with a custom (de)serializer.
>>>
>>> What do you mean, that doesn't seem feasible? You mean when using a
>>> custom deserializer? Right now I'm catching serialization problems in the
>>> message handler, after your proposed change I'd catch them in `map()`.
>>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Alan Braithwaite
I'd probably prefer to keep it the way it is, unless it's becoming more
like the function without the messageHandler argument.

Right now I have code like this, but I wish it were more similar looking:

if (parsed.partitions.isEmpty()) {
  JavaPairInputDStream kvstream = KafkaUtils
  .createDirectStream(jssc, String.class, MessageWrapper.class,
StringDecoder.class,
  MessageDecoder.class, kafkaArgs(parsed), topicSet);
  requests = kvstream.map((Function,
MessageWrapper>) Tuple2::_2);
} else {
  requests = KafkaUtils.createDirectStream(jssc, String.class,
  MessageWrapper.class, StringDecoder.class, MessageDecoder.class,
MessageWrapper.class,
  kafkaArgs(parsed), parsed.partitions,
(Function,
  MessageWrapper>) MessageAndMetadata::message);
}

Of course, this is in the Java API so it may not have relevance to what
you're talking about.

Perhaps if both functions (the one with partitions arg and the one without)
returned just ConsumerRecord, I would like that more.

- Alan

On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger  wrote:

> No, looks like you'd have to catch them in the serializer and have the
> serializer return option or something. The new consumer builds a buffer
> full of records, not one at a time.
> On Mar 8, 2016 4:43 AM, "Marius Soutier"  wrote:
>
>>
>> > On 04.03.2016, at 22:39, Cody Koeninger  wrote:
>> >
>> > The only other valid use of messageHandler that I can think of is
>> > catching serialization problems on a per-message basis.  But with the
>> > new Kafka consumer library, that doesn't seem feasible anyway, and
>> > could be handled with a custom (de)serializer.
>>
>> What do you mean, that doesn't seem feasible? You mean when using a
>> custom deserializer? Right now I'm catching serialization problems in the
>> message handler, after your proposed change I'd catch them in `map()`.
>>
>>


Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Cody Koeninger
No, looks like you'd have to catch them in the serializer and have the
serializer return option or something. The new consumer builds a buffer
full of records, not one at a time.
On Mar 8, 2016 4:43 AM, "Marius Soutier"  wrote:

>
> > On 04.03.2016, at 22:39, Cody Koeninger  wrote:
> >
> > The only other valid use of messageHandler that I can think of is
> > catching serialization problems on a per-message basis.  But with the
> > new Kafka consumer library, that doesn't seem feasible anyway, and
> > could be handled with a custom (de)serializer.
>
> What do you mean, that doesn't seem feasible? You mean when using a custom
> deserializer? Right now I'm catching serialization problems in the message
> handler, after your proposed change I'd catch them in `map()`.
>
>


Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Marius Soutier

> On 04.03.2016, at 22:39, Cody Koeninger  wrote:
> 
> The only other valid use of messageHandler that I can think of is
> catching serialization problems on a per-message basis.  But with the
> new Kafka consumer library, that doesn't seem feasible anyway, and
> could be handled with a custom (de)serializer.

What do you mean, that doesn't seem feasible? You mean when using a custom 
deserializer? Right now I'm catching serialization problems in the message 
handler, after your proposed change I'd catch them in `map()`.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Use cases for kafka direct stream messageHandler

2016-03-04 Thread Cody Koeninger
Wanted to survey what people are using the direct stream
messageHandler for, besides just extracting key / value / offset.

Would your use case still work if that argument was removed, and the
stream just contained ConsumerRecord objects
(http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
which you could then use normal map transformations to access?

The only other valid use of messageHandler that I can think of is
catching serialization problems on a per-message basis.  But with the
new Kafka consumer library, that doesn't seem feasible anyway, and
could be handled with a custom (de)serializer.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot.

发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月27日 上午1:02写道:Yes.On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn  wrote:Thanks a lot.
And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka?

发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the driver. Have you readhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdand/or watchedhttps://www.youtube.com/watch?v=fXnNEq1v3VAOn Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen  wrote:Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below:When a partition (worker node) fails in Spark Streaming,1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right?




Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread Cody Koeninger
Yes.

On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn 
wrote:

> Thanks a lot.
> And I got another question: What would happen if I didn't set
> "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to
> consume all the messages in Kafka?
>
> 发自WPS邮箱客戶端
> 在 Cody Koeninger ,2016年2月25日 上午11:58写道:
>
> The per partition offsets are part of the rdd as defined on the driver.
> Have you read
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
>
> and/or watched
>
> https://www.youtube.com/watch?v=fXnNEq1v3VA
>
> On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen 
> wrote:
>
>> Hi, as far as I know, there is a 1:1 mapping between Spark partition and
>> Kafka partition, and in Spark's fault-tolerance mechanism, if a partition
>> failed, another partition will be used to recompute those data. And my
>> questions are below:
>>
>> When a partition (worker node) fails in Spark Streaming,
>> 1. Is its computation passed to another partition, or just waits for the
>> failed partition to restart?
>> 2. How does the restarted partition know the offset range it should
>> consume from Kafka? It should consume the some data as the before-failed
>> one, right?
>>
>
>


Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot.
And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka?

发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the driver. Have you readhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.mdand/or watchedhttps://www.youtube.com/watch?v=fXnNEq1v3VAOn Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen  wrote:Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below:When a partition (worker node) fails in Spark Streaming,1. Is its computation passed to another partition, or just waits for the failed partition to restart? 2. How does the restarted partition know the offset range it should consume from Kafka? It should consume the some data as the before-failed one, right?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Cody Koeninger
The per partition offsets are part of the rdd as defined on the driver.
Have you read

https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

and/or watched

https://www.youtube.com/watch?v=fXnNEq1v3VA

On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen  wrote:

> Hi, as far as I know, there is a 1:1 mapping between Spark partition and
> Kafka partition, and in Spark's fault-tolerance mechanism, if a partition
> failed, another partition will be used to recompute those data. And my
> questions are below:
>
> When a partition (worker node) fails in Spark Streaming,
> 1. Is its computation passed to another partition, or just waits for the
> failed partition to restart?
> 2. How does the restarted partition know the offset range it should
> consume from Kafka? It should consume the some data as the before-failed
> one, right?
>


How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Yuhang Chen
Hi, as far as I know, there is a 1:1 mapping between Spark partition and
Kafka partition, and in Spark's fault-tolerance mechanism, if a partition
failed, another partition will be used to recompute those data. And my
questions are below:

When a partition (worker node) fails in Spark Streaming,
1. Is its computation passed to another partition, or just waits for the
failed partition to restart?
2. How does the restarted partition know the offset range it should consume
from Kafka? It should consume the some data as the before-failed one, right?


Re: Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Cody Koeninger
That sounds like a networking issue to me.  Stuff to try
- make sure every executor node can talk to every kafka broker on relevant
ports
- look at firewalls / network  config.  Even if you can make the initial
connection, something may be happening after a while (we've seen ...
"interesting"... issues with aws networking for instance)
- look at kafka error logs
- look at lsof or even tcpdump to see whats happening with the relevant
ports when this occurs



On Thu, Oct 22, 2015 at 9:00 AM, Conor Fennell <conorapa...@gmail.com>
wrote:

> Hi,
>
> Firstly want to say a big thanks to Cody for contributing the kafka
> direct stream.
>
> I have been using the receiver based approach for months but the
> direct stream is a much better solution for my use case.
>
> The job in question is now ported over to the direct stream doing
> idempotent outputs to Cassandra and outputting to kafka.
> I am also saving the offsets to Cassandra.
>
> But unfortunately I am sporadically getting the error below.
> It recovers and continues but gives a large spike in the processing
> delay. And it can happen in every 3 or 4 batches.
> I still have other receiver jobs running and they never throw these
> exceptions.
>
> I would be very appreciative for any direction and I can happily
> provide more detail.
>
> Thanks,
> Conor
>
> 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
> found, computing it
> 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
> partition 0 offsets 13630747 -> 13633001
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
> is overridden to
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
> zookeeper.connect is overridden to
> 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
> socket error: java.nio.channels.ClosedChannelException
> 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
> stage 654.0 (TID 5242)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/22 13:31:00 INFO executor.CoarseGrainedExec

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka
direct stream.

I have been using the receiver based approach for months but the
direct stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing
delay. And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these exceptions.

I would be very appreciative for any direction and I can happily
provide more detail.

Thanks,
Conor

15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
found, computing it
15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
partition 0 offsets 13630747 -> 13633001
15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
is overridden to
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to
15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
stage 654.0 (TID 5242)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 5243
15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
654.0 (TID 5243)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka
direct stream.

I have been using the receiver based approach for months but the
direct stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing
delay. And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these exceptions.

I would be very appreciative for any direction and I can happily
provide more detail.

Thanks,
Conor

15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
found, computing it
15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
partition 0 offsets 13630747 -> 13633001
15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
is overridden to
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to
15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
stage 654.0 (TID 5242)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 5243
15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
654.0 (TID 5243)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka direct
stream.

I have been using the receiver based approach for months but the direct
stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing delay.
And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these
exceptions.

I would be very appreciative for any direction and I can happily provide
more detail.

Thanks,
Conor

15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in
stage 66.0 (TID 406)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 407


RE: Node afinity for Kafka-Direct Stream

2015-10-14 Thread prajod.vettiyattil
Hi,

Another point is the in the receiver based approach, all the data from kafka 
first goes to the Worker where the receiver runs
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

Also if you create one stream (which is the normal case), and you have many 
worker instances, only one worker does all the reading. Once that worker reads, 
the data can be “repartitioned” to distribute the load. This repartitioning is 
a data movement overhead in the receiver based approach.
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
{
In Receiver approach:
Multiple Kafka input DStreams can be created with different groups and topics 
for parallel receiving of data using multiple receivers.


In Direct approach:
Simplified Parallelism: No need to create multiple input Kafka streams and 
union them.
}

Prajod
From: Gerard Maas [mailto:gerard.m...@gmail.com]
Sent: 14 October 2015 18:53
To: Saisai Shao <sai.sai.s...@gmail.com>
Cc: Rishitesh Mishra <rmis...@snappydata.io>; spark users 
<user@spark.apache.org>
Subject: Re: Node afinity for Kafka-Direct Stream

Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is 
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of 
where the task will execute as it will depend on the scheduler and cluster 
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the consumer-based 
approach need to be reconsidered when migrating to the direct stream.

((In our case, we were using local caches to decide when a given secondary 
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
<sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote:
This preferred locality is a hint to spark to schedule Kafka tasks on the 
preferred nodes, if Kafka and Spark are two separate cluster, obviously this 
locality hint takes no effect, and spark will schedule tasks following 
node-local -> rack-local -> any pattern, like any other spark tasks.

On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
<rmis...@snappydata.io<mailto:rmis...@snappydata.io>> wrote:
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it 
looks like once Kafka RDD is constructed the execution of that RDD is upto the 
task scheduler and it can schedule the partitions based on the load on nodes. 
There is preferred node specified in Kafks RDD. But ASFIK it maps to the Kafka 
partitions host . So if Kafka and Spark are co hosted probably this will work. 
If not, I am not sure how to get data locality for a partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
<gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote:
In the receiver-based kafka streaming model, given that each receiver starts as 
a long-running task, one can rely in a certain degree of data locality based on 
the kafka partitioning:  Data published on a given topic/partition will land on 
the same spark streaming receiving node until the receiver dies and needs to be 
restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and 
relays the work to a KafkaRDD. How is the execution locality compared to the 
receiver-based approach?

thanks, Gerard.



--

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
Assumptions about locality in spark are not very reliable, regardless of
what consumer you use.  Even if you have locality preferences, and locality
wait turned up really high, you still have to account for losing executors.

On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas  wrote:

> Thanks Saisai, Mishra,
>
> Indeed, that hint will only work on a case where the Spark executor is
> co-located with the Kafka broker.
> I think the answer to my question as stated  is that there's no warranty
> of where the task will execute as it will depend on the scheduler and
> cluster resources available  (Mesos in our case).
> Therefore, any assumptions made about data locality using the
> consumer-based approach need to be reconsidered when migrating to the
> direct stream.
>
> ((In our case, we were using local caches to decide when a given secondary
> index for a record should be produced and written.))
>
> -kr, Gerard.
>
>
>
>
> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
> wrote:
>
>> This preferred locality is a hint to spark to schedule Kafka tasks on the
>> preferred nodes, if Kafka and Spark are two separate cluster, obviously
>> this locality hint takes no effect, and spark will schedule tasks following
>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>
>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
>> wrote:
>>
>>> Hi Gerard,
>>> I am also trying to understand the same issue. Whatever code I have seen
>>> it looks like once Kafka RDD is constructed the execution of that RDD is
>>> upto the task scheduler and it can schedule the partitions based on the
>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>>> probably this will work. If not, I am not sure how to get data locality for
>>> a partition.
>>> Others,
>>> correct me if there is a way.
>>>
>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>>> wrote:
>>>
 In the receiver-based kafka streaming model, given that each receiver
 starts as a long-running task, one can rely in a certain degree of data
 locality based on the kafka partitioning:  Data published on a given
 topic/partition will land on the same spark streaming receiving node until
 the receiver dies and needs to be restarted somewhere else.

 As I understand, the direct-kafka streaming model just computes offsets
 and relays the work to a KafkaRDD. How is the execution locality compared
 to the receiver-based approach?

 thanks, Gerard.

>>>
>>>
>>>
>>> --
>>>
>>> Regards,
>>> Rishitesh Mishra,
>>> SnappyData . (http://www.snappydata.io/)
>>>
>>> https://in.linkedin.com/in/rishiteshmishra
>>>
>>
>>
>


Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver
starts as a long-running task, one can rely in a certain degree of data
locality based on the kafka partitioning:  Data published on a given
topic/partition will land on the same spark streaming receiving node until
the receiver dies and needs to be restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and
relays the work to a KafkaRDD. How is the execution locality compared to
the receiver-based approach?

thanks, Gerard.


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody,

I think that I misused the term 'data locality'. I think I should better
call it "node affinity"  instead, as this is what I would like to have:
For as long as an executor is available, I would like to have the same
kafka partition processed by the same node in order to take advantage of
local in-memory structures.

In the receiver-based mode this was a given. Any ideas how to achieve that
with the direct stream approach?

-greetz, Gerard.


On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger  wrote:

> Assumptions about locality in spark are not very reliable, regardless of
> what consumer you use.  Even if you have locality preferences, and locality
> wait turned up really high, you still have to account for losing executors.
>
> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
> wrote:
>
>> Thanks Saisai, Mishra,
>>
>> Indeed, that hint will only work on a case where the Spark executor is
>> co-located with the Kafka broker.
>> I think the answer to my question as stated  is that there's no warranty
>> of where the task will execute as it will depend on the scheduler and
>> cluster resources available  (Mesos in our case).
>> Therefore, any assumptions made about data locality using the
>> consumer-based approach need to be reconsidered when migrating to the
>> direct stream.
>>
>> ((In our case, we were using local caches to decide when a given
>> secondary index for a record should be produced and written.))
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>> wrote:
>>
>>> This preferred locality is a hint to spark to schedule Kafka tasks on
>>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously
>>> this locality hint takes no effect, and spark will schedule tasks following
>>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>>
>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra >> > wrote:
>>>
 Hi Gerard,
 I am also trying to understand the same issue. Whatever code I have
 seen it looks like once Kafka RDD is constructed the execution of that RDD
 is upto the task scheduler and it can schedule the partitions based on the
 load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
 maps to the Kafka partitions host . So if Kafka and Spark are co hosted
 probably this will work. If not, I am not sure how to get data locality for
 a partition.
 Others,
 correct me if there is a way.

 On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
 wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes
> offsets and relays the work to a KafkaRDD. How is the execution locality
> compared to the receiver-based approach?
>
> thanks, Gerard.
>



 --

 Regards,
 Rishitesh Mishra,
 SnappyData . (http://www.snappydata.io/)

 https://in.linkedin.com/in/rishiteshmishra

>>>
>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
What I'm saying is that it's not a given with spark, even in receiver-based
mode, because as soon as you lose an executor you'll have a rebalance.

Spark's model in general isn't a good fit for pinning work to specific
nodes.

If you really want to try and fake this, you can override
getPreferredLocations and set spark.locality.wait to a high value.



On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maas  wrote:

> Hi Cody,
>
> I think that I misused the term 'data locality'. I think I should better
> call it "node affinity"  instead, as this is what I would like to have:
> For as long as an executor is available, I would like to have the same
> kafka partition processed by the same node in order to take advantage of
> local in-memory structures.
>
> In the receiver-based mode this was a given. Any ideas how to achieve that
> with the direct stream approach?
>
> -greetz, Gerard.
>
>
> On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger 
> wrote:
>
>> Assumptions about locality in spark are not very reliable, regardless of
>> what consumer you use.  Even if you have locality preferences, and locality
>> wait turned up really high, you still have to account for losing executors.
>>
>> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
>> wrote:
>>
>>> Thanks Saisai, Mishra,
>>>
>>> Indeed, that hint will only work on a case where the Spark executor is
>>> co-located with the Kafka broker.
>>> I think the answer to my question as stated  is that there's no warranty
>>> of where the task will execute as it will depend on the scheduler and
>>> cluster resources available  (Mesos in our case).
>>> Therefore, any assumptions made about data locality using the
>>> consumer-based approach need to be reconsidered when migrating to the
>>> direct stream.
>>>
>>> ((In our case, we were using local caches to decide when a given
>>> secondary index for a record should be produced and written.))
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>>> wrote:
>>>
 This preferred locality is a hint to spark to schedule Kafka tasks on
 the preferred nodes, if Kafka and Spark are two separate cluster, obviously
 this locality hint takes no effect, and spark will schedule tasks following
 node-local -> rack-local -> any pattern, like any other spark tasks.

 On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <
 rmis...@snappydata.io> wrote:

> Hi Gerard,
> I am also trying to understand the same issue. Whatever code I have
> seen it looks like once Kafka RDD is constructed the execution of that RDD
> is upto the task scheduler and it can schedule the partitions based on the
> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK 
> it
> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
> probably this will work. If not, I am not sure how to get data locality 
> for
> a partition.
> Others,
> correct me if there is a way.
>
> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
>
>> In the receiver-based kafka streaming model, given that each receiver
>> starts as a long-running task, one can rely in a certain degree of data
>> locality based on the kafka partitioning:  Data published on a given
>> topic/partition will land on the same spark streaming receiving node 
>> until
>> the receiver dies and needs to be restarted somewhere else.
>>
>> As I understand, the direct-kafka streaming model just computes
>> offsets and relays the work to a KafkaRDD. How is the execution locality
>> compared to the receiver-based approach?
>>
>> thanks, Gerard.
>>
>
>
>
> --
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
You could check the code of KafkaRDD, the locality (host) is got from
Kafka's partition and set in KafkaRDD, this will a hint for Spark to
schedule task on the preferred location.

override def getPreferredLocations(thePart: Partition): Seq[String] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  // TODO is additional hostname resolution necessary here
  Seq(part.host)
}


On Wed, Oct 14, 2015 at 5:38 PM, Gerard Maas  wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it
looks like once Kafka RDD is constructed the execution of that RDD is upto
the task scheduler and it can schedule the partitions based on the load on
nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to
the Kafka partitions host . So if Kafka and Spark are co hosted probably
this will work. If not, I am not sure how to get data locality for a
partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas  wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>



-- 

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
This preferred locality is a hint to spark to schedule Kafka tasks on the
preferred nodes, if Kafka and Spark are two separate cluster, obviously
this locality hint takes no effect, and spark will schedule tasks following
node-local -> rack-local -> any pattern, like any other spark tasks.

On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
wrote:

> Hi Gerard,
> I am also trying to understand the same issue. Whatever code I have seen
> it looks like once Kafka RDD is constructed the execution of that RDD is
> upto the task scheduler and it can schedule the partitions based on the
> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
> probably this will work. If not, I am not sure how to get data locality for
> a partition.
> Others,
> correct me if there is a way.
>
> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
>
>> In the receiver-based kafka streaming model, given that each receiver
>> starts as a long-running task, one can rely in a certain degree of data
>> locality based on the kafka partitioning:  Data published on a given
>> topic/partition will land on the same spark streaming receiving node until
>> the receiver dies and needs to be restarted somewhere else.
>>
>> As I understand, the direct-kafka streaming model just computes offsets
>> and relays the work to a KafkaRDD. How is the execution locality compared
>> to the receiver-based approach?
>>
>> thanks, Gerard.
>>
>
>
>
> --
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of
where the task will execute as it will depend on the scheduler and cluster
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the
consumer-based approach need to be reconsidered when migrating to the
direct stream.

((In our case, we were using local caches to decide when a given secondary
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao  wrote:

> This preferred locality is a hint to spark to schedule Kafka tasks on the
> preferred nodes, if Kafka and Spark are two separate cluster, obviously
> this locality hint takes no effect, and spark will schedule tasks following
> node-local -> rack-local -> any pattern, like any other spark tasks.
>
> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
> wrote:
>
>> Hi Gerard,
>> I am also trying to understand the same issue. Whatever code I have seen
>> it looks like once Kafka RDD is constructed the execution of that RDD is
>> upto the task scheduler and it can schedule the partitions based on the
>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>> probably this will work. If not, I am not sure how to get data locality for
>> a partition.
>> Others,
>> correct me if there is a way.
>>
>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>> wrote:
>>
>>> In the receiver-based kafka streaming model, given that each receiver
>>> starts as a long-running task, one can rely in a certain degree of data
>>> locality based on the kafka partitioning:  Data published on a given
>>> topic/partition will land on the same spark streaming receiving node until
>>> the receiver dies and needs to be restarted somewhere else.
>>>
>>> As I understand, the direct-kafka streaming model just computes offsets
>>> and relays the work to a KafkaRDD. How is the execution locality compared
>>> to the receiver-based approach?
>>>
>>> thanks, Gerard.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


Re: Kafka Direct Stream

2015-10-04 Thread varun sharma
I went through the story and as I understood it is for saving data to
multiple keyspaces at once.
How will it work for saving data to multiple tables in same keyspace.
I think tableName: String should also be tableName: T=>String..
Let me know if I understood incorrectly..


On Sat, Oct 3, 2015 at 9:55 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi,
>
> collect(partialFunction) is equivalent to filter(x=>
> partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
> equivalent to your expression. I favor collect for its more compact form
> but that's a personal preference. Use what you feel reads best.
>
> Regarding performance, there will be some overhead of submitting many a
> task for every filtered RDD that gets materialized to Cassandra. That's the
> reason I proposed the ticket linked above. Have a look whether that would
> improve your particular usecase and vote for it if so :-)
>
> -kr, Gerard.
>
> On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Thanks Gerardthe code snippet you shared worked.. but can you please
>> explain/point me the usage of *collect* here. How it is
>> different(performance/readability) from *filter.*
>>
>>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>>
>>
>> I am doing something like this.Please tell if I can improve the *Processing
>> time* of this particular code:
>>
>> kafkaStringStream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct().collect()
>>   if (topics.length > 0) {
>> val rdd_value = rdd.take(10).mkString("\n.\n")
>> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
>> feeds\n$rdd_value"))
>>
>> topics.foreach { topic =>
>>   //rdd.filter(x=> x._1 == topic).map(_._2)
>>   val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>>   CassandraHelper.saveDataToCassandra(topic, filteredRdd)
>> }
>> updateOffsetsinZk(rdd)
>>   }
>>
>> }
>>
>> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Something like this?
>>>
>>> I'm making the assumption that your topic name equals your keyspace for
>>> this filtering example.
>>>
>>> dstream.foreachRDD{rdd =>
>>>   val topics = rdd.map(_._1).distinct.collect
>>>   topics.foreach{topic =>
>>> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>>> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>>> collect with rdd.collect() that brings data to the driver
>>>   }
>>> }
>>>
>>>
>>> I'm wondering: would something like this (
>>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>>> purposes?
>>>
>>> -kr, Gerard.
>>>
>>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
>>> wrote:
>>>
>>>> Hi Adrian,
>>>>
>>>> Can you please give an example of how to achieve this:
>>>>
>>>>> *I would also look at filtering by topic and saving as different
>>>>> Dstreams in your code*
>>>>
>>>> I have managed to get DStream[(String, String)] which is (
>>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>>> DStream[(String,Iterable[String])].
>>>> But I want a DStream instead of Iterable in order to apply
>>>> saveToCassandra for storing it.
>>>>
>>>> Please help in how to transform iterable to DStream or any other
>>>> workaround for achieving same.
>>>>
>>>>
>>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com>
>>>> wrote:
>>>>
>>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>>> operators for the processing.
>>>>>
>>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>>> topics) and the processing is *really* different, I would also look
>>>>> at filtering by topic and saving as different Dstreams in your code.
>>>>>
>>>>> Either way you need to start with Cody’s tip in order to extract the
>>>>> topic name.
&g

Re: Kafka Direct Stream

2015-10-03 Thread varun sharma
Thanks Gerardthe code snippet you shared worked.. but can you please
explain/point me the usage of *collect* here. How it is
different(performance/readability) from *filter.*

> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*


I am doing something like this.Please tell if I can improve the *Processing
time* of this particular code:

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
val rdd_value = rdd.take(10).mkString("\n.\n")
Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all
feeds\n$rdd_value"))

topics.foreach { topic =>
  //rdd.filter(x=> x._1 == topic).map(_._2)
  val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
  CassandraHelper.saveDataToCassandra(topic, filteredRdd)
}
updateOffsetsinZk(rdd)
  }

}

On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Something like this?
>
> I'm making the assumption that your topic name equals your keyspace for
> this filtering example.
>
> dstream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct.collect
>   topics.foreach{topic =>
> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
> collect with rdd.collect() that brings data to the driver
>   }
> }
>
>
> I'm wondering: would something like this (
> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
> purposes?
>
> -kr, Gerard.
>
> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Hi Adrian,
>>
>> Can you please give an example of how to achieve this:
>>
>>> *I would also look at filtering by topic and saving as different
>>> Dstreams in your code*
>>
>> I have managed to get DStream[(String, String)] which is (
>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>> Now if I do kafkaStringStream.groupByKey() then I would get a
>> DStream[(String,Iterable[String])].
>> But I want a DStream instead of Iterable in order to apply
>> saveToCassandra for storing it.
>>
>> Please help in how to transform iterable to DStream or any other
>> workaround for achieving same.
>>
>>
>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>> operators for the processing.
>>>
>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>> topics) and the processing is *really* different, I would also look at
>>> filtering by topic and saving as different Dstreams in your code.
>>>
>>> Either way you need to start with Cody’s tip in order to extract the
>>> topic name.
>>>
>>> -adrian
>>>
>>> From: Cody Koeninger
>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>> To: Udit Mehta
>>> Cc: user
>>> Subject: Re: Kafka Direct Stream
>>>
>>> You can get the topic for a given partition from the offset range.  You
>>> can either filter using that; or just have a single rdd and match on topic
>>> when doing mapPartitions or foreachPartition (which I think is a better
>>> idea)
>>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>
>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using spark direct stream to consume from multiple topics in
>>>> Kafka. I am able to consume fine but I am stuck at how to separate the data
>>>> for each topic since I need to process data differently depending on the
>>>> topic.
>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>> each having 1 topic.
>>>>
>>>> Any help would be appreciated.
>>>>
>>>> Thanks in advance,
>>>> Udit
>>>>
>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Direct Stream

2015-10-03 Thread Gerard Maas
Hi,

collect(partialFunction) is equivalent to filter(x=>
partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
equivalent to your expression. I favor collect for its more compact form
but that's a personal preference. Use what you feel reads best.

Regarding performance, there will be some overhead of submitting many a
task for every filtered RDD that gets materialized to Cassandra. That's the
reason I proposed the ticket linked above. Have a look whether that would
improve your particular usecase and vote for it if so :-)

-kr, Gerard.

On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com>
wrote:

> Thanks Gerardthe code snippet you shared worked.. but can you please
> explain/point me the usage of *collect* here. How it is
> different(performance/readability) from *filter.*
>
>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>
>
> I am doing something like this.Please tell if I can improve the *Processing
> time* of this particular code:
>
> kafkaStringStream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct().collect()
>   if (topics.length > 0) {
> val rdd_value = rdd.take(10).mkString("\n.\n")
> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
> feeds\n$rdd_value"))
>
> topics.foreach { topic =>
>   //rdd.filter(x=> x._1 == topic).map(_._2)
>   val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>   CassandraHelper.saveDataToCassandra(topic, filteredRdd)
> }
> updateOffsetsinZk(rdd)
>   }
>
> }
>
> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Something like this?
>>
>> I'm making the assumption that your topic name equals your keyspace for
>> this filtering example.
>>
>> dstream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct.collect
>>   topics.foreach{topic =>
>> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>> collect with rdd.collect() that brings data to the driver
>>   }
>> }
>>
>>
>> I'm wondering: would something like this (
>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>> purposes?
>>
>> -kr, Gerard.
>>
>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
>> wrote:
>>
>>> Hi Adrian,
>>>
>>> Can you please give an example of how to achieve this:
>>>
>>>> *I would also look at filtering by topic and saving as different
>>>> Dstreams in your code*
>>>
>>> I have managed to get DStream[(String, String)] which is (
>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>> DStream[(String,Iterable[String])].
>>> But I want a DStream instead of Iterable in order to apply
>>> saveToCassandra for storing it.
>>>
>>> Please help in how to transform iterable to DStream or any other
>>> workaround for achieving same.
>>>
>>>
>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>>
>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>> operators for the processing.
>>>>
>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>> topics) and the processing is *really* different, I would also look at
>>>> filtering by topic and saving as different Dstreams in your code.
>>>>
>>>> Either way you need to start with Cody’s tip in order to extract the
>>>> topic name.
>>>>
>>>> -adrian
>>>>
>>>> From: Cody Koeninger
>>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>>> To: Udit Mehta
>>>> Cc: user
>>>> Subject: Re: Kafka Direct Stream
>>>>
>>>> You can get the topic for a given partition from the offset range.  You
>>>> can either filter using that; or just have a single rdd and match on topic
>>>> when doing mapPartitions or foreachPartition (which I think is a better
>>>> idea)
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>
>>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using spark direct stream to consume from multiple topics in
>>>>> Kafka. I am able to consume fine but I am stuck at how to separate the 
>>>>> data
>>>>> for each topic since I need to process data differently depending on the
>>>>> topic.
>>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>>> each having 1 topic.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks in advance,
>>>>> Udit
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Kafka Direct Stream

2015-10-02 Thread Gerard Maas
Something like this?

I'm making the assumption that your topic name equals your keyspace for
this filtering example.

dstream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct.collect
  topics.foreach{topic =>
val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
collect with rdd.collect() that brings data to the driver
  }
}


I'm wondering: would something like this (
https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
purposes?

-kr, Gerard.

On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
wrote:

> Hi Adrian,
>
> Can you please give an example of how to achieve this:
>
>> *I would also look at filtering by topic and saving as different Dstreams
>> in your code*
>
> I have managed to get DStream[(String, String)] which is (*topic,my_data)*
> tuple. Lets call it kafkaStringStream.
> Now if I do kafkaStringStream.groupByKey() then I would get a
> DStream[(String,Iterable[String])].
> But I want a DStream instead of Iterable in order to apply saveToCassandra
> for storing it.
>
> Please help in how to transform iterable to DStream or any other
> workaround for achieving same.
>
>
> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> On top of that you could make the topic part of the key (e.g. keyBy in
>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>> operators for the processing.
>>
>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>> topics) and the processing is *really* different, I would also look at
>> filtering by topic and saving as different Dstreams in your code.
>>
>> Either way you need to start with Cody’s tip in order to extract the
>> topic name.
>>
>> -adrian
>>
>> From: Cody Koeninger
>> Date: Thursday, October 1, 2015 at 5:06 PM
>> To: Udit Mehta
>> Cc: user
>> Subject: Re: Kafka Direct Stream
>>
>> You can get the topic for a given partition from the offset range.  You
>> can either filter using that; or just have a single rdd and match on topic
>> when doing mapPartitions or foreachPartition (which I think is a better
>> idea)
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>
>>> Hi,
>>>
>>> I am using spark direct stream to consume from multiple topics in Kafka.
>>> I am able to consume fine but I am stuck at how to separate the data for
>>> each topic since I need to process data differently depending on the topic.
>>> I basically want to split the RDD consisting on N topics into N RDD's
>>> each having 1 topic.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks in advance,
>>> Udit
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Hi Adrian,

Can you please give an example of how to achieve this:

> *I would also look at filtering by topic and saving as different Dstreams
> in your code*

I have managed to get DStream[(String, String)] which is (*topic,my_data)*
tuple. Lets call it kafkaStringStream.
Now if I do kafkaStringStream.groupByKey() then I would get a
DStream[(String,Iterable[String])].
But I want a DStream instead of Iterable in order to apply saveToCassandra
for storing it.

Please help in how to transform iterable to DStream or any other workaround
for achieving same.


On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:

> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look at
> filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the topic
> name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.  You
> can either filter using that; or just have a single rdd and match on topic
> when doing mapPartitions or foreachPartition (which I think is a better
> idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in Kafka.
>> I am able to consume fine but I am stuck at how to separate the data for
>> each topic since I need to process data differently depending on the topic.
>> I basically want to split the RDD consisting on N topics into N RDD's
>> each having 1 topic.
>>
>> Any help would be appreciated.
>>
>> Thanks in advance,
>> Udit
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Hi Nicolae,

Won't creating N KafkaDirectStreams be an overhead for my streaming job
compared to Single DirectStream?

On Fri, Oct 2, 2015 at 1:13 AM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> If you just need processing per topic, why not generate N different kafka
> direct streams ? when creating a kafka direct stream you have list of
> topics - just give one.
>
>
> Then the reusable part of your computations should be extractable as
> transformations/functions and reused between the streams.
>
>
> Nicu
>
>
>
> --
> *From:* Adrian Tanase <atan...@adobe.com>
> *Sent:* Thursday, October 1, 2015 5:47 PM
> *To:* Cody Koeninger; Udit Mehta
> *Cc:* user
> *Subject:* Re: Kafka Direct Stream
>
> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look at
> filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the topic
> name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.  You
> can either filter using that; or just have a single rdd and match on topic
> when doing mapPartitions or foreachPartition (which I think is a better
> idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
> Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ...
> Spark Streaming + Kafka Integration Guide. Apache Kafka is
> publish-subscribe messaging rethought as a distributed, partitioned,
> replicated commit log service.
> Read more...
> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>
>
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in Kafka.
>> I am able to consume fine but I am stuck at how to separate the data for
>> each topic since I need to process data differently depending on the topic.
>> I basically want to split the RDD consisting on N topics into N RDD's
>> each having 1 topic.
>>
>> Any help would be appreciated.
>>
>> Thanks in advance,
>> Udit
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Direct Stream

2015-10-01 Thread Adrian Tanase
On top of that you could make the topic part of the key (e.g. keyBy in 
.transform or manually emitting a tuple) and use one of the .xxxByKey operators 
for the processing.

If you have a stable, domain specific list of topics (e.g. 3-5 named topics) 
and the processing is really different, I would also look at filtering by topic 
and saving as different Dstreams in your code.

Either way you need to start with Cody’s tip in order to extract the topic name.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:06 PM
To: Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

You can get the topic for a given partition from the offset range.  You can 
either filter using that; or just have a single rdd and match on topic when 
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
<ume...@groupon.com<mailto:ume...@groupon.com>> wrote:
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I am 
able to consume fine but I am stuck at how to separate the data for each topic 
since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each 
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit



Re: Kafka Direct Stream

2015-10-01 Thread Cody Koeninger
You can get the topic for a given partition from the offset range.  You can
either filter using that; or just have a single rdd and match on topic when
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:

> Hi,
>
> I am using spark direct stream to consume from multiple topics in Kafka. I
> am able to consume fine but I am stuck at how to separate the data for each
> topic since I need to process data differently depending on the topic.
> I basically want to split the RDD consisting on N topics into N RDD's each
> having 1 topic.
>
> Any help would be appreciated.
>
> Thanks in advance,
> Udit
>


Re: Kafka Direct Stream

2015-10-01 Thread Nicolae Marasoiu
Hi,


If you just need processing per topic, why not generate N different kafka 
direct streams ? when creating a kafka direct stream you have list of topics - 
just give one.


Then the reusable part of your computations should be extractable as 
transformations/functions and reused between the streams.


Nicu



From: Adrian Tanase <atan...@adobe.com>
Sent: Thursday, October 1, 2015 5:47 PM
To: Cody Koeninger; Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

On top of that you could make the topic part of the key (e.g. keyBy in 
.transform or manually emitting a tuple) and use one of the .xxxByKey operators 
for the processing.

If you have a stable, domain specific list of topics (e.g. 3-5 named topics) 
and the processing is really different, I would also look at filtering by topic 
and saving as different Dstreams in your code.

Either way you need to start with Cody's tip in order to extract the topic name.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:06 PM
To: Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

You can get the topic for a given partition from the offset range.  You can 
either filter using that; or just have a single rdd and match on topic when 
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
[http://spark.apache.org/docs/latest/img/spark-logo-hd.png]<http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>

Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ...
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.
Read 
more...<http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>




On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
<ume...@groupon.com<mailto:ume...@groupon.com>> wrote:
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I am 
able to consume fine but I am stuck at how to separate the data for each topic 
since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each 
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit



[streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Alexey Ponkin
Hi

I have simple spark-streaming job(8 executors 1 core - on 8 node cluster) - 
read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra.
The problem is that when I increase number of incoming messages in topic the 
job is starting to fail with kafka.common.OffsetOutOfRangeException.
Job fails starting from 100 events per second.

Thanks in advance

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kafka Direct Stream

2015-09-30 Thread Udit Mehta
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I
am able to consume fine but I am stuck at how to separate the data for each
topic since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit


Re: [streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Cody Koeninger
Offset out of range means the message in question is no longer available on
Kafka.  What's your kafka log retention set to, and how does that compare
to your processing time?

On Wed, Sep 30, 2015 at 4:26 AM, Alexey Ponkin  wrote:

> Hi
>
> I have simple spark-streaming job(8 executors 1 core - on 8 node cluster)
> - read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra.
> The problem is that when I increase number of incoming messages in topic
> the job is starting to fail with kafka.common.OffsetOutOfRangeException.
> Job fails starting from 100 events per second.
>
> Thanks in advance
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Kafka Direct Stream join without data shuffle

2015-09-02 Thread Chen Song
I have a stream got from Kafka with direct approach, say, inputStream, I
need to

1. Create another DStream derivedStream with map or mapPartitions (with
some data enrichment with reference table) on inputStream
2. Join derivedStream with inputStream

In my use case, I don't need to shuffle data. Each partition in
derivedStream only needs to be joined with the corresponding partition in
the original parent inputStream it is generated from.

My question is

1. Is there a Partitioner defined in KafkaRDD at all?
2. How would I preserve the partitioning scheme and avoid data shuffle?

-- 
Chen Song


Re: Kafka Direct Stream join without data shuffle

2015-09-02 Thread Cody Koeninger
No, there isn't a partitioner for KafkaRDD (KafkaRDD may not even be a pair
rdd, for instance).

It sounds to me like if it's a self-join, you should be able to do it in a
single mapPartition operation.

On Wed, Sep 2, 2015 at 3:06 PM, Chen Song  wrote:

> I have a stream got from Kafka with direct approach, say, inputStream, I
> need to
>
> 1. Create another DStream derivedStream with map or mapPartitions (with
> some data enrichment with reference table) on inputStream
> 2. Join derivedStream with inputStream
>
> In my use case, I don't need to shuffle data. Each partition in
> derivedStream only needs to be joined with the corresponding partition in
> the original parent inputStream it is generated from.
>
> My question is
>
> 1. Is there a Partitioner defined in KafkaRDD at all?
> 2. How would I preserve the partitioning scheme and avoid data shuffle?
>
> --
> Chen Song
>
>


Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Cody Koeninger
Does the issue only happen when you have no traffic on the topic?

Have you profiled to see what's using heap space?


On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen apoorva.sar...@gmail.com
wrote:

 Hi,

 I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0)
 with java 1.8.0_45 and also Kafka direct stream. I am also using spark with
 scala 2.11 support.

 The issue I am seeing is that both driver and executor containers are
 gradually increasing the physical memory usage till a point where yarn
 container kill it. I have configured upto 192M Heap and 384 off heap space
 in my driver but it eventually runs out of it

 The Heap memory appears to be fine with regular GC cycles. There is no
 OutOffMemory encountered ever in any such runs

 Infact I am not generating any traffic on the kafka queues still this
 happens. Here is the code I am using

 object SimpleSparkStreaming extends App {

 val conf = new SparkConf()
 val ssc = new 
 StreamingContext(conf,Seconds(conf.getLong(spark.batch.window.size,1L)));
 ssc.checkpoint(checkpoint)
 val topics = Set(conf.get(spark.kafka.topic.name));
 val kafkaParams = Map[String, String](metadata.broker.list - 
 conf.get(spark.kafka.broker.list))
 val kafkaStream = 
 KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
 kafkaParams, topics)
 kafkaStream.foreachRDD(rdd = {
 rdd.foreach(x = {
 println(x._2)
 })

 })
 kafkaStream.print()
 ssc.start()

 ssc.awaitTermination()

 }

 I am running this on CentOS 7. The command used for spark submit is
 following

 ./bin/spark-submit --class 
 com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
 --conf spark.yarn.executor.memoryOverhead=256 \
 --conf spark.yarn.driver.memoryOverhead=384 \
 --conf spark.kafka.topic.name=test \
 --conf spark.kafka.broker.list=172.31.45.218:9092 \
 --conf spark.batch.window.size=1 \
 --conf spark.app.name=Simple Spark Kafka application \
 --master yarn-cluster \
 --num-executors 1 \
 --driver-memory 192m \
 --executor-memory 128m \
 --executor-cores 1 \
 /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar

 Any help is greatly appreciated

 Regards,

 Apoorva



Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Apoorva Sareen
Hi,

I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with 
java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 
2.11 support.

The issue I am seeing is that both driver and executor containers are gradually 
increasing the physical memory usage till a point where yarn container kill it. 
I have configured upto 192M Heap and 384 off heap space in my driver but it 
eventually runs out of it

The Heap memory appears to be fine with regular GC cycles. There is no 
OutOffMemory encountered ever in any such runs

Infact I am not generating any traffic on the kafka queues still this happens. 
Here is the code I am using

object SimpleSparkStreaming extends App {

val conf = new SparkConf()
val ssc = new 
StreamingContext(conf,Seconds(conf.getLong(spark.batch.window.size,1L)));
ssc.checkpoint(checkpoint)
val topics = Set(conf.get(spark.kafka.topic.name)); 
val kafkaParams = Map[String, String](metadata.broker.list - 
conf.get(spark.kafka.broker.list))
val kafkaStream = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topics)
kafkaStream.foreachRDD(rdd = {
rdd.foreach(x = {
println(x._2)
})

})
kafkaStream.print()
ssc.start() 

ssc.awaitTermination()

}
I am running this on CentOS 7. The command used for spark submit is following

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name=Simple Spark Kafka application \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 
Any help is greatly appreciated

Regards,

Apoorva

Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Apoorva Sareen
It happens irrespective of whether there is traffic or no traffic on the kafka 
topic. Also, there is no clue i could see in the heap space. The heap looks 
healthy and stable. Its something off heap which is constantly growing. I also 
checked the JNI reference count from the dumps which appear stable (its 
constantly getting GCed) and tried to limit the size of meatspace and direct 
memory using following

--conf spark.driver.extraJavaOptions=-XX:MaxMetaspaceSize=128M 
-XX:MaxDirectMemorySize=128M \

but with no success.  Thanks for offering help

Regards,
Apoorva

 On 14-Jul-2015, at 12:43 am, Cody Koeninger c...@koeninger.org wrote:
 
 Does the issue only happen when you have no traffic on the topic?
 
 Have you profiled to see what's using heap space?
 
 
 On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen apoorva.sar...@gmail.com 
 mailto:apoorva.sar...@gmail.com wrote:
 Hi,
 
 I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with 
 java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 
 2.11 support.
 
 The issue I am seeing is that both driver and executor containers are 
 gradually increasing the physical memory usage till a point where yarn 
 container kill it. I have configured upto 192M Heap and 384 off heap space in 
 my driver but it eventually runs out of it
 
 The Heap memory appears to be fine with regular GC cycles. There is no 
 OutOffMemory encountered ever in any such runs
 
 Infact I am not generating any traffic on the kafka queues still this 
 happens. Here is the code I am using
 
 object SimpleSparkStreaming extends App {
 
 val conf = new SparkConf()
 val ssc = new 
 StreamingContext(conf,Seconds(conf.getLong(spark.batch.window.size,1L)));
 ssc.checkpoint(checkpoint)
 val topics = Set(conf.get(spark.kafka.topic.name 
 http://spark.kafka.topic.name/)); 
 val kafkaParams = Map[String, String](metadata.broker.list - 
 conf.get(spark.kafka.broker.list))
 val kafkaStream = 
 KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
 kafkaParams, topics)
 kafkaStream.foreachRDD(rdd = {
 rdd.foreach(x = {
 println(x._2)
 })
 
 })
 kafkaStream.print()
 ssc.start() 
 
 ssc.awaitTermination()
 
 }
 I am running this on CentOS 7. The command used for spark submit is following
 
 ./bin/spark-submit --class 
 com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
 --conf spark.yarn.executor.memoryOverhead=256 \
 --conf spark.yarn.driver.memoryOverhead=384 \
 --conf spark.kafka.topic.name http://spark.kafka.topic.name/=test \
 --conf spark.kafka.broker.list=172.31.45.218:9092 
 http://172.31.45.218:9092/ \
 --conf spark.batch.window.size=1 \
 --conf spark.app.name http://spark.app.name/=Simple Spark Kafka 
 application \
 --master yarn-cluster \
 --num-executors 1 \
 --driver-memory 192m \
 --executor-memory 128m \
 --executor-cores 1 \
 /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar
  
 Any help is greatly appreciated
 
 Regards,
 
 Apoorva
 
 



Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Ashish Soni
Hi ,

If i have a below data format , how can i use kafka direct stream to
de-serialize as i am not able to understand all the parameter i need to
pass , Can some one explain what will be the arguments as i am not clear
about this

JavaPairInputDStream
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82org.apache.spark.streaming.api.java.JavaPairInputDStream
K
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82K,
V
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82V
org
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg
.apache
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache
.spark
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark
.streaming
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming
.kafka
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka
.KafkaUtils
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils
.createDirectStream(JavaStreamingContext
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82org.apache.spark.streaming.api.java.JavaStreamingContext
arg0, Class
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD;%3E;~Ljava.util.Map%5C%3CLjava.lang.String;Ljava.lang.String;%3E;~Ljava.util.Set%5C%3CLjava.lang.String;%3E;%E2%98%82java.lang.Class
K
eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C/eassoni%5C/.m2%5C/repository%5C/org%5C/apache%5C/spark%5C/spark-streaming-kafka_2.10%5C/1.4.0%5C/spark-streaming-kafka_2.10-1.4.0.jar%3Corg.apache.spark.streaming.kafka(KafkaUtils.class%E2%98%83KafkaUtils~createDirectStream~Lorg.apache.spark.streaming.api.java.JavaStreamingContext;~Ljava.lang.Class%5C%3CTK;%3E;~Ljava.lang.Class%5C%3CTV;%3E;~Ljava.lang.Class%5C%3CTKD;%3E;~Ljava.lang.Class%5C%3CTVD

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Ashish Soni
my question is why there are similar two parameter String.Class and
StringDecoder.class what is the difference each of them ?

Ashish

On Fri, Jun 26, 2015 at 8:53 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 ​JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(
 jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicsSet
 );

 Here:

 jssc = JavaStreamingContext
 String.class = Key , Value classes
 StringDecoder = Key, Value decoder classes
 KafkaParams = Map in which you specify all the kafka details (like
 brokers, offset etc)
 topicSet = Set of topics from which you want to consume data.​

 ​Here's a sample program
 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 for you to start.​



 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 Hi ,

 If i have a below data format , how can i use kafka direct stream to
 de-serialize as i am not able to understand all the parameter i need to
 pass , Can some one explain what will be the arguments as i am not clear
 about this

 JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils
 .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV
 arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, Set
 String arg6)

 ID
 Name
 Unit
 Rate
 Duration





Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Benjamin Fradet
There is one for the key of your Kafka message and one for its value.
On 26 Jun 2015 4:21 pm, Ashish Soni asoni.le...@gmail.com wrote:

 my question is why there are similar two parameter String.Class and
 StringDecoder.class what is the difference each of them ?

 Ashish

 On Fri, Jun 26, 2015 at 8:53 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 ​JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(
 jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicsSet
 );

 Here:

 jssc = JavaStreamingContext
 String.class = Key , Value classes
 StringDecoder = Key, Value decoder classes
 KafkaParams = Map in which you specify all the kafka details (like
 brokers, offset etc)
 topicSet = Set of topics from which you want to consume data.​

 ​Here's a sample program
 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 for you to start.​



 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 Hi ,

 If i have a below data format , how can i use kafka direct stream to
 de-serialize as i am not able to understand all the parameter i need to
 pass , Can some one explain what will be the arguments as i am not clear
 about this

 JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils
 .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV
 arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, Set
 String arg6)

 ID
 Name
 Unit
 Rate
 Duration






Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Akhil Das
​JavaPairInputDStreamString, String messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

Here:

jssc = JavaStreamingContext
String.class = Key , Value classes
StringDecoder = Key, Value decoder classes
KafkaParams = Map in which you specify all the kafka details (like
brokers, offset etc)
topicSet = Set of topics from which you want to consume data.​

​Here's a sample program
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
for you to start.​



Thanks
Best Regards

On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com wrote:

 Hi ,

 If i have a below data format , how can i use kafka direct stream to
 de-serialize as i am not able to understand all the parameter i need to
 pass , Can some one explain what will be the arguments as i am not clear
 about this

 JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils
 .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV
 arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, SetString
 arg6)

 ID
 Name
 Unit
 Rate
 Duration