​JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );

Here:

jssc => JavaStreamingContext
String.class => Key , Value classes
StringDecoder => Key, Value decoder classes
KafkaParams => Map in which you specify all the kafka details (like
brokers, offset etc)
topicSet => Set of topics from which you want to consume data.​

​Here's a sample program
<https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java>
for you to start.​



Thanks
Best Regards

On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni <asoni.le...@gmail.com> wrote:

> Hi ,
>
> If i have a below data format , how can i use kafka direct stream to
> de-serialize as i am not able to understand all the parameter i need to
> pass , Can some one explain what will be the arguments as i am not clear
> about this
>
> JavaPairInputDStream<K, V> org.apache.spark.streaming.kafka.KafkaUtils
> .createDirectStream(JavaStreamingContext arg0, Class<K> arg1, Class<V>
> arg2, Class<KD> arg3, Class<VD> arg4, Map<String, String> arg5, Set<String>
> arg6)
>
> ID
> Name
> Unit
> Rate
> Duration
>
>

Reply via email to