I am doing the same thing this way:

// Iterate over HashSet of topics
        Iterator<String> iterator = topicsSet.iterator();
        JavaPairInputDStream<String, String> messages;
        JavaDStream<String> lines;
        String topic = "";
        // get messages stream for each topic
        while (iterator.hasNext()) {
            topic = iterator.next();
            // Create direct kafka stream with brokers and topic
            messages = KafkaUtils.createDirectStream(jssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                    new HashSet<String>(Arrays.asList(topic)));

            // get lines from messages.map
            lines = messages.map(new Function<Tuple2<String, String>,
String>() {
                @Override
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            });


            switch (topic) {
            case IMPR_ACC:
                ImprLogProc.groupAndCount(lines, esImpIndexName, IMPR_ACC,
new ImprMarshal());

                break;
            case EVENTS_ACC:
                EventLogProc.groupAndCount(lines, esEventIndexName,
EVENTS_ACC, new EventMarshal());
                break;

            default:
                logger.error("No matching Kafka topics Found");
                break;
            }

On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> One way would be to keep it this way:
>
> val stream1 = KafkaUtils.createStream(..) // for topic 1
>
> val stream2 = KafkaUtils.createStream(..) // for topic 2
>
>
> And you will know which stream belongs to which topic.
>
> Another approach which you can put in your code itself would be to tag the
> topic name along with the stream that you are creating. Like, create a
> tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
> the stream.
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm just trying to create a spark streaming application that consumes
>> more than one topics sent by kafka. Then, I want to do different further
>> processing for data sent by each topic.
>>
>> val kafkaStreams = {
>>>       val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>>>         Map(
>>>           "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>>           "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>>           "group.id" -> consumerGroup,
>>>           "zookeeper.connection.timeout.ms" ->
>>> ConsumerConfig.zookeeperConnectionTimeout,
>>>           "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>>           "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>>         )
>>>       }
>>>       val streams = (0 to kafkaParameter.length - 1) map { p =>
>>>         KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>>> DefaultDecoder](
>>>           ssc,
>>>           kafkaParameter(p),
>>>           Map(topicsArr(p) -> 1),
>>>           StorageLevel.MEMORY_ONLY_SER
>>>         ).map(_._2)
>>>       }
>>>       val unifiedStream = ssc.union(streams)
>>>       unifiedStream.repartition(1)
>>>     }
>>>     kafkaStreams.foreachRDD(rdd => {
>>>       rdd.foreachPartition(partitionOfRecords => {
>>>         partitionOfRecords.foreach ( x =>
>>>           println(x)
>>>         )
>>>       })
>>>     })
>>
>>
>> So far, I'm able to get the data from several topic. However, I'm still
>> unable to
>> differentiate the data sent from a topic with another.
>>
>> Do anybody has an experience in doing this stuff?
>>
>> Best,
>> Imre
>>
>
>


-- 
Thanks,
Saurabh

:)

Reply via email to