Sure... I do set the group.id for all the consumers to be the same. Here is
the code ---

                SparkConf sparkConf = new
SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("group.id", consumerGrp);
kafkaConf.put("auto.offset.reset", "smallest");
kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
kafkaConf.put("rebalance.max.retries", "4");
kafkaConf.put("rebalance.backoff.ms", "3000");
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
 List<JavaPairDStream<byte[], String>> kafkaStreams = new
ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
Exception {
return tuple2;
}
}
)
);
}


                JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <sean.mcnam...@webtrends.com>
wrote:

> Would you mind sharing the code leading to your createStream?  Are you
> also setting group.id?
>
> Thanks,
>
> Sean
>
>
> On Oct 10, 2014, at 4:31 PM, Abraham Jacob <abe.jac...@gmail.com> wrote:
>
> > Hi Folks,
> >
> > I am seeing some strange behavior when using the Spark Kafka connector
> in Spark streaming.
> >
> > I have a Kafka topic which has 8 partitions. I have a kafka producer
> that pumps some messages into this topic.
> >
> > On the consumer side I have a spark streaming application that that has
> 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
> kafka group id connected to the 8 partitions I have for the topic. Also the
> kafka consumer property "auto.offset.reset" is set to "smallest".
> >
> >
> > Now here is the sequence of steps -
> >
> > (1) I Start the the spark streaming app.
> > (2) Start the producer.
> >
> > As this point I see the messages that are being pumped from the producer
> in Spark Streaming.  Then I -
> >
> > (1) Stopped the producer
> > (2) Wait for all the message to be consumed.
> > (2) Stopped the spark streaming app.
> >
> > Now when I restart the spark streaming app (note - the producer is still
> down and no messages are being pumped into the topic) - I observe the
> following -
> >
> > (1) Spark Streaming starts reading from each partition right from the
> beginning.
> >
> >
> > This is not what I was expecting. I was expecting the consumers started
> by spark streaming to start from where it left off....
> >
> > Is my assumption not correct that "the consumers (the kafka/spark
> connector) to start reading from the topic where it last left off."..?
> >
> > Has anyone else seen this behavior? Is there a way to make it such that
> it starts from where it left off?
> >
> > Regards,
> > - Abraham
>
>


-- 
~

Reply via email to