Cody, the link is helpful. But I still have issues in my test. I set "auto.offset.reset" to "earliest" and then create KafkaRDD using OffsetRange which is out of range.
According to Kafka's document, I expect to get earliest offset of that partition. But I get below exception and it looks like "auto.offset.reset" is ignored at all. Please help, thanks again! ====== 16/10/14 15:45:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-null mytopic2 0 2 after polling for 512 ====== -----Original Message----- From: Cody Koeninger [mailto:c...@koeninger.org] Sent: 2016年10月13日 9:31 To: Haopu Wang Cc: user@spark.apache.org Subject: Re: Kafka integration: get existing Kafka messages? Look at the presentation and blog post linked from https://github.com/koeninger/kafka-exactly-once They refer to the kafka 0.8 version of the direct stream but the basic idea is the same On Wed, Oct 12, 2016 at 7:35 PM, Haopu Wang <hw...@qilinsoft.com> wrote: > Cody, thanks for the response. > > > > So Kafka direct stream actually has consumer on both the driver and > executor? Can you please provide more details? Thank you very much! > > > > ________________________________ > > From: Cody Koeninger [mailto:c...@koeninger.org] > Sent: 2016年10月12日 20:10 > To: Haopu Wang > Cc: user@spark.apache.org > Subject: Re: Kafka integration: get existing Kafka messages? > > > > its set to none for the executors, because otherwise they wont do exactly > what the driver told them to do. > > > > you should be able to set up the driver consumer to determine batches > however you want, though. > > On Wednesday, October 12, 2016, Haopu Wang <hw...@qilinsoft.com> wrote: > > Hi, > > > > I want to read the existing Kafka messages and then subscribe new stream > messages. > > But I find "auto.offset.reset" property is always set to "none" in > KafkaUtils. Does that mean I cannot specify "earliest" property value when > create direct stream? > > Thank you! > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org