Thanks Jerry, So, from what I can understand from the code, if I leave out
"auto.offset.reset", it should theoretically read from the last commit
point... Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai <saisai.s...@intel.com> wrote:

>  Hi Abraham,
>
>
>
> You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
> different from original Kafka’s semantics, if you set this configure,
> KafkaReceiver will clean the related immediately, but for Kafka this
> configuration is just a hint which will be effective only when offset is
> out-of-range. So you will always read data from the beginning as you set to
> “smallest”, otherwise if you set to “largest”, you will always get data
> from the end immediately.
>
>
>
> There’s a JIRA and PR to follow this, but still not merged to the master,
> you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492
> ).
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
> *Sent:* Saturday, October 11, 2014 6:57 AM
> *To:* Sean McNamara
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming KafkaUtils Issue
>
>
>
> Probably this is the issue -
>
>
>
>
> http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
>
>
>
>
>
> ·        Spark’s usage of the Kafka consumer parameter auto.offset.reset
> <http://kafka.apache.org/documentation.html#consumerconfigs> is different
> from Kafka’s semantics. In Kafka, the behavior of setting
> auto.offset.reset to “smallest” is that the consumer will automatically
> reset the offset to the smallest offset when a) there is no existing offset
> stored in ZooKeeper or b) there is an existing offset but it is out of
> range. Spark however will *always* remove existing offsets and then start
> all the way from zero again. This means whenever you restart your
> application with auto.offset.reset = "smallest", your application will
> completely re-process all available Kafka data. Doh! See this discussion
> <http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html>
> and that discussion <http://markmail.org/message/257a5l3oqyftsjxj>.
>
>
>
> Hmm.... interesting... Wondering what happens if I set it as largest...?
>
>
>
>
>
> On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob <abe.jac...@gmail.com>
> wrote:
>
>  Sure... I do set the group.id for all the consumers to be the same. Here
> is the code ---
>
>
>
>                 SparkConf sparkConf = new
> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
>
> sparkConf.set("spark.shuffle.manager", "SORT");
>
> sparkConf.set("spark.streaming.unpersist", "true");
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(1000));
>
> Map<String, String> kafkaConf = new HashMap<String, String>();
>
> kafkaConf.put("zookeeper.connect", zookeeper);
>
> kafkaConf.put("group.id", consumerGrp);
>
> kafkaConf.put("auto.offset.reset", "smallest");
>
> kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
>
> kafkaConf.put("rebalance.max.retries", "4");
>
> kafkaConf.put("rebalance.backoff.ms", "3000");
>
> Map<String, Integer> topicMap = new HashMap<String, Integer>();
>
> topicMap.put(topic, 1);
>
> List<JavaPairDStream<byte[], String>> kafkaStreams = new
> ArrayList<JavaPairDStream<byte[], String>>();
>
> for(int i = 0; i < numPartitions; i++) {
>
> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
>
> DefaultDecoder.class, PayloadDeSerializer.class,
>
> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
> PairFunction<Tuple2<byte[],String>, byte[], String>() {
>
>
>
> private static final long serialVersionUID = -1936810126415608167L;
>
>
>
> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
> Exception {
>
> return tuple2;
>
> }
>
> }
>
> )
>
> );
>
> }
>
>
>
>
>
>                 JavaPairDStream<byte[], String> unifiedStream;
>
> if (kafkaStreams.size() > 1) {
>
> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
>
> } else {
>
> unifiedStream = kafkaStreams.get(0);
>
> }
>
> unifiedStream.print();
>
> jssc.start();
>
> jssc.awaitTermination();
>
>
>
>
>
> -abe
>
>
>
>
>
> On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <
> sean.mcnam...@webtrends.com> wrote:
>
> Would you mind sharing the code leading to your createStream?  Are you
> also setting group.id?
>
> Thanks,
>
> Sean
>
>
>
> On Oct 10, 2014, at 4:31 PM, Abraham Jacob <abe.jac...@gmail.com> wrote:
>
> > Hi Folks,
> >
> > I am seeing some strange behavior when using the Spark Kafka connector
> in Spark streaming.
> >
> > I have a Kafka topic which has 8 partitions. I have a kafka producer
> that pumps some messages into this topic.
> >
> > On the consumer side I have a spark streaming application that that has
> 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
> kafka group id connected to the 8 partitions I have for the topic. Also the
> kafka consumer property "auto.offset.reset" is set to "smallest".
> >
> >
> > Now here is the sequence of steps -
> >
> > (1) I Start the the spark streaming app.
> > (2) Start the producer.
> >
> > As this point I see the messages that are being pumped from the producer
> in Spark Streaming.  Then I -
> >
> > (1) Stopped the producer
> > (2) Wait for all the message to be consumed.
> > (2) Stopped the spark streaming app.
> >
> > Now when I restart the spark streaming app (note - the producer is still
> down and no messages are being pumped into the topic) - I observe the
> following -
> >
> > (1) Spark Streaming starts reading from each partition right from the
> beginning.
> >
> >
> > This is not what I was expecting. I was expecting the consumers started
> by spark streaming to start from where it left off....
> >
> > Is my assumption not correct that "the consumers (the kafka/spark
> connector) to start reading from the topic where it last left off."..?
> >
> > Has anyone else seen this behavior? Is there a way to make it such that
> it starts from where it left off?
> >
> > Regards,
> > - Abraham
>
>
>
>
>
> --
> ~
>
>
>
>
>
> --
> ~
>



-- 
~

Reply via email to