Re: FW: Kafka Direct Stream - dynamic topic subscription

2017-10-29 Thread Cody Koeninger
; I forward the email I sent to spark users group that contains a little more > background on my question. > > > > Thank you, > > Regards, > > Buvana > > > > From: Ramanan, Buvana (Nokia - US/Murray Hill) > [mailto:buvana.rama...@nokia-bell-labs.com] > Sent:

Kafka Direct Stream - dynamic topic subscription

2017-10-27 Thread Ramanan, Buvana (Nokia - US/Murray Hill)
Hello, Using Spark 2.2.0. Interested in seeing the action of dynamic topic subscription. Tried this example: streaming.DirectKafkaWordCount (which uses org.apache.spark.streaming.kafka010) I start with 8 Kafka partitions in my topic and found that Spark Streaming executes 8 tasks (one per

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-19 Thread HARSH TAKKAR
Thanks Cody, It worked for me buy keeping num executor with each having 1 core = num of partitions of kafka. On Mon, Sep 18, 2017 at 8:47 PM Cody Koeninger wrote: > Have you searched in jira, e.g. > > https://issues.apache.org/jira/browse/SPARK-19185 > > On Mon, Sep 18,

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Cody Koeninger
Have you searched in jira, e.g. https://issues.apache.org/jira/browse/SPARK-19185 On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR wrote: > Hi > > Changing spark version if my last resort, is there any other workaround for > this problem. > > > On Mon, Sep 18, 2017 at 11:43

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread HARSH TAKKAR
Hi Changing spark version if my last resort, is there any other workaround for this problem. On Mon, Sep 18, 2017 at 11:43 AM pandees waran wrote: > All, May I know what exactly changed in 2.1.1 which solved this problem? > > Sent from my iPhone > > On Sep 17, 2017, at

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread pandees waran
All, May I know what exactly changed in 2.1.1 which solved this problem? Sent from my iPhone > On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias wrote: > > Hi, > > I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 > solved my issue. Can you try with

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Anastasios Zouzias
Hi, I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 solved my issue. Can you try with 2.1.1 as well and report back? Best, Anastasios Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" : Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread kant kodali
You should paste some code. ConcurrentModificationException normally happens when you modify a list or any non-thread safe data structure while you are iterating over it. On Sun, Sep 17, 2017 at 10:25 PM, HARSH TAKKAR wrote: > Hi, > > No we are not creating any thread for

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi, No we are not creating any thread for kafka DStream however, we have a single thread for refreshing a resource cache on driver, but that is totally separate to this connection. On Mon, Sep 18, 2017 at 12:29 AM kant kodali wrote: > Are you creating threads in your

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread kant kodali
Are you creating threads in your application? On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR wrote: > > Hi > > I am using spark 2.1.0 with scala 2.11.8, and while iterating over the > partitions of each rdd in a dStream formed using KafkaUtils, i am getting > the below

ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating over the partitions of each rdd in a dStream formed using KafkaUtils, i am getting the below exception, please suggest a fix. I have following config kafka : enable.auto.commit:"true", auto.commit.interval.ms:"1000",

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
ng, i highly recommend >> learning Structured Streaming >> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> >> instead. >> >> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> >> wrote: &g

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
8 ms, 149668431 ms, 149668434 ms, 149668437 ms, >> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, >> 1496684520000 ms, 149668455 ms >> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms >> 17/06/05 13:42:31 IN

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
:16 AM, anbucheeralan <alunarbe...@gmail.com> wrote: > I am using Spark Streaming Checkpoint and Kafka Direct Stream. > It uses a 30 sec batch duration and normally the job is successful in > 15-20 sec. > > If the spark application fails after the successful completion > (14

Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread anbucheeralan
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again

Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again

Re: Spark 2 Kafka Direct Stream Consumer Issue

2017-05-24 Thread Jayadeep J
Could any of the experts kindly advise ? On Fri, May 19, 2017 at 6:00 PM, Jayadeep J <jayade...@gmail.com> wrote: > Hi , > > I would appreciate some advice regarding an issue we are facing in > Streaming Kafka Direct Consumer. > > We have recently upgraded our appli

Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
DStream checkpoints have all kinds of other difficulties, biggest one being you can't use a checkpoint if your app code has been updated. If you can avoid checkpoints in general, I would. On Fri, Oct 21, 2016 at 11:17 AM, Erwan ALLAIN wrote: > Thanks for the fast answer

Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Thanks for the fast answer ! I just feel annoyed and frustrated not to be able to use spark checkpointing because I believe that there mechanism has been correctly tested. I'm afraid that reinventing the wheel can lead to side effects that I don't see now ... Anyway thanks again, I know what I

Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
0. If your processing time is regularly greater than your batch interval you're going to have problems anyway. Investigate this more, set maxRatePerPartition, something. 1. That's personally what I tend to do. 2. Why are you relying on checkpoints if you're storing offset state in the database?

Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Hi, I'm currently implementing an exactly once mechanism based on the following example: https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala the pseudo code is as follow: dstream.transform (store offset in a variable on driver side )

Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Cody Koeninger
Yeah, to be clear, I'm talking about having only one constructor for a direct stream, that will give you a stream of ConsumerRecord. Different needs for topic subscription, starting offsets, etc could be handled by calling appropriate methods after construction but before starting the stream.

Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Alan Braithwaite
I'd probably prefer to keep it the way it is, unless it's becoming more like the function without the messageHandler argument. Right now I have code like this, but I wish it were more similar looking: if (parsed.partitions.isEmpty()) { JavaPairInputDStream

Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Cody Koeninger
No, looks like you'd have to catch them in the serializer and have the serializer return option or something. The new consumer builds a buffer full of records, not one at a time. On Mar 8, 2016 4:43 AM, "Marius Soutier" wrote: > > > On 04.03.2016, at 22:39, Cody Koeninger

Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Marius Soutier
> On 04.03.2016, at 22:39, Cody Koeninger wrote: > > The only other valid use of messageHandler that I can think of is > catching serialization problems on a per-message basis. But with the > new Kafka consumer library, that doesn't seem feasible anyway, and > could be

Use cases for kafka direct stream messageHandler

2016-03-04 Thread Cody Koeninger
Wanted to survey what people are using the direct stream messageHandler for, besides just extracting key / value / offset. Would your use case still work if that argument was removed, and the stream just contained ConsumerRecord objects

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot. 发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月27日 上午1:02写道:Yes.On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn wrote:Thanks a lot. And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread Cody Koeninger
Yes. On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn wrote: > Thanks a lot. > And I got another question: What would happen if I didn't set > "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to > consume all the messages in Kafka? > > 发自WPS邮箱客戶端 > 在

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot. And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka? 发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Cody Koeninger
The per partition offsets are part of the rdd as defined on the driver. Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md and/or watched https://www.youtube.com/watch?v=fXnNEq1v3VA On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen wrote:

How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Yuhang Chen
Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below: When a partition (worker node) fails in Spark Streaming,

Re: Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Cody Koeninger
say a big thanks to Cody for contributing the kafka > direct stream. > > I have been using the receiver based approach for months but the > direct stream is a much better solution for my use case. > > The job in question is now ported over to the direct stream doing > idempotent out

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

RE: Node afinity for Kafka-Direct Stream

2015-10-14 Thread prajod.vettiyattil
] Sent: 14 October 2015 18:53 To: Saisai Shao <sai.sai.s...@gmail.com> Cc: Rishitesh Mishra <rmis...@snappydata.io>; spark users <user@spark.apache.org> Subject: Re: Node afinity for Kafka-Direct Stream Thanks Saisai, Mishra, Indeed, that hint will only work on a case where

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
Assumptions about locality in spark are not very reliable, regardless of what consumer you use. Even if you have locality preferences, and locality wait turned up really high, you still have to account for losing executors. On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas

Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver starts as a long-running task, one can rely in a certain degree of data locality based on the kafka partitioning: Data published on a given topic/partition will land on the same spark streaming receiving node until the receiver

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody, I think that I misused the term 'data locality'. I think I should better call it "node affinity" instead, as this is what I would like to have: For as long as an executor is available, I would like to have the same kafka partition processed by the same node in order to take advantage of

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
What I'm saying is that it's not a given with spark, even in receiver-based mode, because as soon as you lose an executor you'll have a rebalance. Spark's model in general isn't a good fit for pinning work to specific nodes. If you really want to try and fake this, you can override

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
You could check the code of KafkaRDD, the locality (host) is got from Kafka's partition and set in KafkaRDD, this will a hint for Spark to schedule task on the preferred location. override def getPreferredLocations(thePart: Partition): Seq[String] = { val part =

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard, I am also trying to understand the same issue. Whatever code I have seen it looks like once Kafka RDD is constructed the execution of that RDD is upto the task scheduler and it can schedule the partitions based on the load on nodes. There is preferred node specified in Kafks RDD. But

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
This preferred locality is a hint to spark to schedule Kafka tasks on the preferred nodes, if Kafka and Spark are two separate cluster, obviously this locality hint takes no effect, and spark will schedule tasks following node-local -> rack-local -> any pattern, like any other spark tasks. On

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra, Indeed, that hint will only work on a case where the Spark executor is co-located with the Kafka broker. I think the answer to my question as stated is that there's no warranty of where the task will execute as it will depend on the scheduler and cluster resources

Re: Kafka Direct Stream

2015-10-04 Thread varun sharma
t;>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> >>>> wrote: >>>> >>>>> On top of that you could make the topic part of the key (e.g. keyBy in >>>>> .transform or manually emitting a tuple) and use one of t

Re: Kafka Direct Stream

2015-10-03 Thread varun sharma
> .transform or manually emitting a tuple) and use one of the .xxxByKey >>> operators for the processing. >>> >>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>> topics) and the processing is *really* different, I would also look at >>>

Re: Kafka Direct Stream

2015-10-03 Thread Gerard Maas
n top of that you could make the topic part of the key (e.g. keyBy in >>>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>>> operators for the processing. >>>> >>>> If you have a stable, domain specific list of topics (e.g. 3-5 nam

Re: Kafka Direct Stream

2015-10-02 Thread Gerard Maas
by topic and saving as different Dstreams in your code. >> >> Either way you need to start with Cody’s tip in order to extract the >> topic name. >> >> -adrian >> >> From: Cody Koeninger >> Date: Thursday, October 1, 2015 at 5:06 PM >> To: Ud

Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Koeninger > Date: Thursday, October 1, 2015 at 5:06 PM > To: Udit Mehta > Cc: user > Subject: Re: Kafka Direct Stream > > You can get the topic for a given partition from the offset range. You > can either filter using that; or just have a single rdd and match on topic >

Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
N different kafka > direct streams ? when creating a kafka direct stream you have list of > topics - just give one. > > > Then the reusable part of your computations should be extractable as > transformations/functions and reused between the streams. > > > Nicu > >

Re: Kafka Direct Stream

2015-10-01 Thread Adrian Tanase
would also look at filtering by topic and saving as different Dstreams in your code. Either way you need to start with Cody’s tip in order to extract the topic name. -adrian From: Cody Koeninger Date: Thursday, October 1, 2015 at 5:06 PM To: Udit Mehta Cc: user Subject: Re: Kafka Direct Stream You

Re: Kafka Direct Stream

2015-10-01 Thread Cody Koeninger
You can get the topic for a given partition from the offset range. You can either filter using that; or just have a single rdd and match on topic when doing mapPartitions or foreachPartition (which I think is a better idea)

Re: Kafka Direct Stream

2015-10-01 Thread Nicolae Marasoiu
Hi, If you just need processing per topic, why not generate N different kafka direct streams ? when creating a kafka direct stream you have list of topics - just give one. Then the reusable part of your computations should be extractable as transformations/functions and reused between

[streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Alexey Ponkin
Hi I have simple spark-streaming job(8 executors 1 core - on 8 node cluster) - read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra. The problem is that when I increase number of incoming messages in topic the job is starting to fail with

Kafka Direct Stream

2015-09-30 Thread Udit Mehta
Hi, I am using spark direct stream to consume from multiple topics in Kafka. I am able to consume fine but I am stuck at how to separate the data for each topic since I need to process data differently depending on the topic. I basically want to split the RDD consisting on N topics into N RDD's

Re: [streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Cody Koeninger
Offset out of range means the message in question is no longer available on Kafka. What's your kafka log retention set to, and how does that compare to your processing time? On Wed, Sep 30, 2015 at 4:26 AM, Alexey Ponkin wrote: > Hi > > I have simple spark-streaming job(8

Kafka Direct Stream join without data shuffle

2015-09-02 Thread Chen Song
I have a stream got from Kafka with direct approach, say, inputStream, I need to 1. Create another DStream derivedStream with map or mapPartitions (with some data enrichment with reference table) on inputStream 2. Join derivedStream with inputStream In my use case, I don't need to shuffle data.

Re: Kafka Direct Stream join without data shuffle

2015-09-02 Thread Cody Koeninger
No, there isn't a partitioner for KafkaRDD (KafkaRDD may not even be a pair rdd, for instance). It sounds to me like if it's a self-join, you should be able to do it in a single mapPartition operation. On Wed, Sep 2, 2015 at 3:06 PM, Chen Song wrote: > I have a stream

Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Cody Koeninger
and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space

Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Apoorva Sareen
Hi, I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till

Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Apoorva Sareen
running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till

Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Ashish Soni
Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStream eclipse-javadoc:%E2%98%82=cde/C:%5C/Users%5C

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Ashish Soni
for you to start.​ Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Benjamin Fradet
, how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils .createDirectStream

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Akhil Das
/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java for you to start.​ Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able