Cody, the link is helpful. But I still have issues in my test.

I set "auto.offset.reset" to "earliest" and then create KafkaRDD using 
OffsetRange which is out of range.

According to Kafka's document, I expect to get earliest offset of that 
partition.

But I get below exception and it looks like "auto.offset.reset" is ignored at 
all. Please help, thanks again!

======
16/10/14 15:45:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-null mytopic2 0 2 after polling for 512
======

-----Original Message-----
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 2016年10月13日 9:31
To: Haopu Wang
Cc: user@spark.apache.org
Subject: Re: Kafka integration: get existing Kafka messages?

Look at the presentation and blog post linked from

https://github.com/koeninger/kafka-exactly-once

They refer to the kafka 0.8 version of the direct stream but the basic
idea is the same

On Wed, Oct 12, 2016 at 7:35 PM, Haopu Wang <hw...@qilinsoft.com> wrote:
> Cody, thanks for the response.
>
>
>
> So Kafka direct stream actually has consumer on both the driver and
> executor? Can you please provide more details? Thank you very much!
>
>
>
> ________________________________
>
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 2016年10月12日 20:10
> To: Haopu Wang
> Cc: user@spark.apache.org
> Subject: Re: Kafka integration: get existing Kafka messages?
>
>
>
> its set to none for the executors, because otherwise they wont do exactly
> what the driver told them to do.
>
>
>
> you should be able to set up the driver consumer to determine batches
> however you want, though.
>
> On Wednesday, October 12, 2016, Haopu Wang <hw...@qilinsoft.com> wrote:
>
> Hi,
>
>
>
> I want to read the existing Kafka messages and then subscribe new stream
> messages.
>
> But I find "auto.offset.reset" property is always set to "none" in
> KafkaUtils. Does that mean I cannot specify "earliest" property value when
> create direct stream?
>
> Thank you!
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to