Sure... I do set the group.id for all the consumers to be the same. Here is the code ---
SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); sparkConf.set("spark.shuffle.manager", "SORT"); sparkConf.set("spark.streaming.unpersist", "true"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); Map<String, String> kafkaConf = new HashMap<String, String>(); kafkaConf.put("zookeeper.connect", zookeeper); kafkaConf.put("group.id", consumerGrp); kafkaConf.put("auto.offset.reset", "smallest"); kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); kafkaConf.put("rebalance.max.retries", "4"); kafkaConf.put("rebalance.backoff.ms", "3000"); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>(); for(int i = 0; i < numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStream<byte[], String> unifiedStream; if (kafkaStreams.size() > 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <sean.mcnam...@webtrends.com> wrote: > Would you mind sharing the code leading to your createStream? Are you > also setting group.id? > > Thanks, > > Sean > > > On Oct 10, 2014, at 4:31 PM, Abraham Jacob <abe.jac...@gmail.com> wrote: > > > Hi Folks, > > > > I am seeing some strange behavior when using the Spark Kafka connector > in Spark streaming. > > > > I have a Kafka topic which has 8 partitions. I have a kafka producer > that pumps some messages into this topic. > > > > On the consumer side I have a spark streaming application that that has > 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same > kafka group id connected to the 8 partitions I have for the topic. Also the > kafka consumer property "auto.offset.reset" is set to "smallest". > > > > > > Now here is the sequence of steps - > > > > (1) I Start the the spark streaming app. > > (2) Start the producer. > > > > As this point I see the messages that are being pumped from the producer > in Spark Streaming. Then I - > > > > (1) Stopped the producer > > (2) Wait for all the message to be consumed. > > (2) Stopped the spark streaming app. > > > > Now when I restart the spark streaming app (note - the producer is still > down and no messages are being pumped into the topic) - I observe the > following - > > > > (1) Spark Streaming starts reading from each partition right from the > beginning. > > > > > > This is not what I was expecting. I was expecting the consumers started > by spark streaming to start from where it left off.... > > > > Is my assumption not correct that "the consumers (the kafka/spark > connector) to start reading from the topic where it last left off."..? > > > > Has anyone else seen this behavior? Is there a way to make it such that > it starts from where it left off? > > > > Regards, > > - Abraham > > -- ~