Thanks Jerry, So, from what I can understand from the code, if I leave out "auto.offset.reset", it should theoretically read from the last commit point... Correct?
-abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai <saisai.s...@intel.com> wrote: > Hi Abraham, > > > > You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is > different from original Kafka’s semantics, if you set this configure, > KafkaReceiver will clean the related immediately, but for Kafka this > configuration is just a hint which will be effective only when offset is > out-of-range. So you will always read data from the beginning as you set to > “smallest”, otherwise if you set to “largest”, you will always get data > from the end immediately. > > > > There’s a JIRA and PR to follow this, but still not merged to the master, > you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492 > ). > > > > Thanks > > Jerry > > > > *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] > *Sent:* Saturday, October 11, 2014 6:57 AM > *To:* Sean McNamara > *Cc:* user@spark.apache.org > *Subject:* Re: Spark Streaming KafkaUtils Issue > > > > Probably this is the issue - > > > > > http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ > > > > > > · Spark’s usage of the Kafka consumer parameter auto.offset.reset > <http://kafka.apache.org/documentation.html#consumerconfigs> is different > from Kafka’s semantics. In Kafka, the behavior of setting > auto.offset.reset to “smallest” is that the consumer will automatically > reset the offset to the smallest offset when a) there is no existing offset > stored in ZooKeeper or b) there is an existing offset but it is out of > range. Spark however will *always* remove existing offsets and then start > all the way from zero again. This means whenever you restart your > application with auto.offset.reset = "smallest", your application will > completely re-process all available Kafka data. Doh! See this discussion > <http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html> > and that discussion <http://markmail.org/message/257a5l3oqyftsjxj>. > > > > Hmm.... interesting... Wondering what happens if I set it as largest...? > > > > > > On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob <abe.jac...@gmail.com> > wrote: > > Sure... I do set the group.id for all the consumers to be the same. Here > is the code --- > > > > SparkConf sparkConf = new > SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); > > sparkConf.set("spark.shuffle.manager", "SORT"); > > sparkConf.set("spark.streaming.unpersist", "true"); > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new > Duration(1000)); > > Map<String, String> kafkaConf = new HashMap<String, String>(); > > kafkaConf.put("zookeeper.connect", zookeeper); > > kafkaConf.put("group.id", consumerGrp); > > kafkaConf.put("auto.offset.reset", "smallest"); > > kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); > > kafkaConf.put("rebalance.max.retries", "4"); > > kafkaConf.put("rebalance.backoff.ms", "3000"); > > Map<String, Integer> topicMap = new HashMap<String, Integer>(); > > topicMap.put(topic, 1); > > List<JavaPairDStream<byte[], String>> kafkaStreams = new > ArrayList<JavaPairDStream<byte[], String>>(); > > for(int i = 0; i < numPartitions; i++) { > > kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, > > DefaultDecoder.class, PayloadDeSerializer.class, > > kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new > PairFunction<Tuple2<byte[],String>, byte[], String>() { > > > > private static final long serialVersionUID = -1936810126415608167L; > > > > public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws > Exception { > > return tuple2; > > } > > } > > ) > > ); > > } > > > > > > JavaPairDStream<byte[], String> unifiedStream; > > if (kafkaStreams.size() > 1) { > > unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, > kafkaStreams.size())); > > } else { > > unifiedStream = kafkaStreams.get(0); > > } > > unifiedStream.print(); > > jssc.start(); > > jssc.awaitTermination(); > > > > > > -abe > > > > > > On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara < > sean.mcnam...@webtrends.com> wrote: > > Would you mind sharing the code leading to your createStream? Are you > also setting group.id? > > Thanks, > > Sean > > > > On Oct 10, 2014, at 4:31 PM, Abraham Jacob <abe.jac...@gmail.com> wrote: > > > Hi Folks, > > > > I am seeing some strange behavior when using the Spark Kafka connector > in Spark streaming. > > > > I have a Kafka topic which has 8 partitions. I have a kafka producer > that pumps some messages into this topic. > > > > On the consumer side I have a spark streaming application that that has > 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same > kafka group id connected to the 8 partitions I have for the topic. Also the > kafka consumer property "auto.offset.reset" is set to "smallest". > > > > > > Now here is the sequence of steps - > > > > (1) I Start the the spark streaming app. > > (2) Start the producer. > > > > As this point I see the messages that are being pumped from the producer > in Spark Streaming. Then I - > > > > (1) Stopped the producer > > (2) Wait for all the message to be consumed. > > (2) Stopped the spark streaming app. > > > > Now when I restart the spark streaming app (note - the producer is still > down and no messages are being pumped into the topic) - I observe the > following - > > > > (1) Spark Streaming starts reading from each partition right from the > beginning. > > > > > > This is not what I was expecting. I was expecting the consumers started > by spark streaming to start from where it left off.... > > > > Is my assumption not correct that "the consumers (the kafka/spark > connector) to start reading from the topic where it last left off."..? > > > > Has anyone else seen this behavior? Is there a way to make it such that > it starts from where it left off? > > > > Regards, > > - Abraham > > > > > > -- > ~ > > > > > > -- > ~ > -- ~