This jira and comment sums up the issue:
https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708

Basically the offset param was renamed and had slightly different semantics 
between kafka 0.7 than 0.8.  Also it was useful because earlier versions of the 
spark streaming receiver could be overwhelmed when having a streaming job down 
for a period of time.

I think this PR quite nicely addresses the issue:
https://github.com/apache/spark/pull/1420


Best,

Sean


On Oct 10, 2014, at 6:48 PM, Abraham Jacob <abe.jac...@gmail.com> wrote:

Thanks Jerry, So, from what I can understand from the code, if I leave out 
"auto.offset.reset", it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
<saisai.s...@intel.com<mailto:saisai.s...@intel.com>> wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com<mailto:abe.jac...@gmail.com>]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•        Spark’s usage of the Kafka consumer parameter 
auto.offset.reset<http://kafka.apache.org/documentation.html#consumerconfigs> 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = "smallest", your application will completely re-process all 
available Kafka data. Doh! See this 
discussion<http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html>and
 that discussion<http://markmail.org/message/257a5l3oqyftsjxj>.

Hmm.... interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
<abe.jac...@gmail.com<mailto:abe.jac...@gmail.com>> wrote:
Sure... I do set the group.id<http://group.id/> for all the consumers to be the 
same. Here is the code ---

                SparkConf sparkConf = new 
SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("group.id<http://group.id/>", consumerGrp);
kafkaConf.put("auto.offset.reset", "smallest");
kafkaConf.put("zookeeper.conection.timeout.ms<http://zookeeper.conection.timeout.ms/>",
 "1000");
kafkaConf.put("rebalance.max.retries", "4");
kafkaConf.put("rebalance.backoff.ms<http://rebalance.backoff.ms/>", "3000");
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
List<JavaPairDStream<byte[], String>> kafkaStreams = new 
ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


                JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
<sean.mcnam...@webtrends.com<mailto:sean.mcnam...@webtrends.com>> wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id<http://group.id/>?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
<abe.jac...@gmail.com<mailto:abe.jac...@gmail.com>> wrote:

> Hi Folks,
>
> I am seeing some strange behavior when using the Spark Kafka connector in 
> Spark streaming.
>
> I have a Kafka topic which has 8 partitions. I have a kafka producer that 
> pumps some messages into this topic.
>
> On the consumer side I have a spark streaming application that that has 8 
> executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
> group id connected to the 8 partitions I have for the topic. Also the kafka 
> consumer property "auto.offset.reset" is set to "smallest".
>
>
> Now here is the sequence of steps -
>
> (1) I Start the the spark streaming app.
> (2) Start the producer.
>
> As this point I see the messages that are being pumped from the producer in 
> Spark Streaming.  Then I -
>
> (1) Stopped the producer
> (2) Wait for all the message to be consumed.
> (2) Stopped the spark streaming app.
>
> Now when I restart the spark streaming app (note - the producer is still down 
> and no messages are being pumped into the topic) - I observe the following -
>
> (1) Spark Streaming starts reading from each partition right from the 
> beginning.
>
>
> This is not what I was expecting. I was expecting the consumers started by 
> spark streaming to start from where it left off....
>
> Is my assumption not correct that "the consumers (the kafka/spark connector) 
> to start reading from the topic where it last left off."..?
>
> Has anyone else seen this behavior? Is there a way to make it such that it 
> starts from where it left off?
>
> Regards,
> - Abraham



--
~



--
~



--
~

Reply via email to