JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet );
Here: jssc => JavaStreamingContext String.class => Key , Value classes StringDecoder => Key, Value decoder classes KafkaParams => Map in which you specify all the kafka details (like brokers, offset etc) topicSet => Set of topics from which you want to consume data. Here's a sample program <https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java> for you to start. Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni <asoni.le...@gmail.com> wrote: > Hi , > > If i have a below data format , how can i use kafka direct stream to > de-serialize as i am not able to understand all the parameter i need to > pass , Can some one explain what will be the arguments as i am not clear > about this > > JavaPairInputDStream<K, V> org.apache.spark.streaming.kafka.KafkaUtils > .createDirectStream(JavaStreamingContext arg0, Class<K> arg1, Class<V> > arg2, Class<KD> arg3, Class<VD> arg4, Map<String, String> arg5, Set<String> > arg6) > > ID > Name > Unit > Rate > Duration > >