Re: Streaming app consume multiple kafka topics

2016-03-15 Thread Imre Nagi
Hi Cody,

Can you give a bit example how to use mapPartitions with a switch on topic?
I've tried, yet still didn't work.

On Tue, Mar 15, 2016 at 9:45 PM, Cody Koeninger  wrote:

> The direct stream gives you access to the topic.  The offset range for
> each partition contains the topic.  That way you can create a single
> stream, and the first thing you do with it is mapPartitions with a
> switch on topic.
>
> Of course, it may make more sense to separate topics into different
> jobs, but if you want it all in one, that's the most straightforward
> way to do it imho.
>
> On Tue, Mar 15, 2016 at 1:55 AM, saurabh guru 
> wrote:
> > I am doing the same thing this way:
> >
> > // Iterate over HashSet of topics
> > Iterator iterator = topicsSet.iterator();
> > JavaPairInputDStream messages;
> > JavaDStream lines;
> > String topic = "";
> > // get messages stream for each topic
> > while (iterator.hasNext()) {
> > topic = iterator.next();
> > // Create direct kafka stream with brokers and topic
> > messages = KafkaUtils.createDirectStream(jssc, String.class,
> > String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
> > new HashSet(Arrays.asList(topic)));
> >
> > // get lines from messages.map
> > lines = messages.map(new Function,
> > String>() {
> > @Override
> > public String call(Tuple2 tuple2) {
> > return tuple2._2();
> > }
> > });
> >
> >
> > switch (topic) {
> > case IMPR_ACC:
> > ImprLogProc.groupAndCount(lines, esImpIndexName,
> IMPR_ACC,
> > new ImprMarshal());
> >
> > break;
> > case EVENTS_ACC:
> > EventLogProc.groupAndCount(lines, esEventIndexName,
> > EVENTS_ACC, new EventMarshal());
> > break;
> >
> > default:
> > logger.error("No matching Kafka topics Found");
> > break;
> > }
> >
> > On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das 
> > wrote:
> >>
> >> One way would be to keep it this way:
> >>
> >> val stream1 = KafkaUtils.createStream(..) // for topic 1
> >>
> >> val stream2 = KafkaUtils.createStream(..) // for topic 2
> >>
> >>
> >> And you will know which stream belongs to which topic.
> >>
> >> Another approach which you can put in your code itself would be to tag
> the
> >> topic name along with the stream that you are creating. Like, create a
> >> tuple(topic, stream) and you will be able to access ._1 as topic and
> ._2 as
> >> the stream.
> >>
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm just trying to create a spark streaming application that consumes
> >>> more than one topics sent by kafka. Then, I want to do different
> further
> >>> processing for data sent by each topic.
> >>>
>  val kafkaStreams = {
>    val kafkaParameter = for (consumerGroup <- consumerGroups)
> yield {
>  Map(
>    "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>    "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>    "group.id" -> consumerGroup,
>    "zookeeper.connection.timeout.ms" ->
>  ConsumerConfig.zookeeperConnectionTimeout,
>    "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>    "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>  )
>    }
>    val streams = (0 to kafkaParameter.length - 1) map { p =>
>  KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>  DefaultDecoder](
>    ssc,
>    kafkaParameter(p),
>    Map(topicsArr(p) -> 1),
>    StorageLevel.MEMORY_ONLY_SER
>  ).map(_._2)
>    }
>    val unifiedStream = ssc.union(streams)
>    unifiedStream.repartition(1)
>  }
>  kafkaStreams.foreachRDD(rdd => {
>    rdd.foreachPartition(partitionOfRecords => {
>  partitionOfRecords.foreach ( x =>
>    println(x)
>  )
>    })
>  })
> >>>
> >>>
> >>> So far, I'm able to get the data from several topic. However, I'm still
> >>> unable to
> >>> differentiate the data sent from a topic with another.
> >>>
> >>> Do anybody has an experience in doing this stuff?
> >>>
> >>> Best,
> >>> Imre
> >>
> >>
> >
> >
> >
> > --
> > Thanks,
> > Saurabh
> >
> > :)
>


Re: Streaming app consume multiple kafka topics

2016-03-15 Thread saurabh guru
I am doing the same thing this way:

// Iterate over HashSet of topics
Iterator iterator = topicsSet.iterator();
JavaPairInputDStream messages;
JavaDStream lines;
String topic = "";
// get messages stream for each topic
while (iterator.hasNext()) {
topic = iterator.next();
// Create direct kafka stream with brokers and topic
messages = KafkaUtils.createDirectStream(jssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
new HashSet(Arrays.asList(topic)));

// get lines from messages.map
lines = messages.map(new Function,
String>() {
@Override
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});


switch (topic) {
case IMPR_ACC:
ImprLogProc.groupAndCount(lines, esImpIndexName, IMPR_ACC,
new ImprMarshal());

break;
case EVENTS_ACC:
EventLogProc.groupAndCount(lines, esEventIndexName,
EVENTS_ACC, new EventMarshal());
break;

default:
logger.error("No matching Kafka topics Found");
break;
}

On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das 
wrote:

> One way would be to keep it this way:
>
> val stream1 = KafkaUtils.createStream(..) // for topic 1
>
> val stream2 = KafkaUtils.createStream(..) // for topic 2
>
>
> And you will know which stream belongs to which topic.
>
> Another approach which you can put in your code itself would be to tag the
> topic name along with the stream that you are creating. Like, create a
> tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
> the stream.
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi 
> wrote:
>
>> Hi,
>>
>> I'm just trying to create a spark streaming application that consumes
>> more than one topics sent by kafka. Then, I want to do different further
>> processing for data sent by each topic.
>>
>> val kafkaStreams = {
>>>   val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>>> Map(
>>>   "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>>   "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>>   "group.id" -> consumerGroup,
>>>   "zookeeper.connection.timeout.ms" ->
>>> ConsumerConfig.zookeeperConnectionTimeout,
>>>   "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>>   "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>> )
>>>   }
>>>   val streams = (0 to kafkaParameter.length - 1) map { p =>
>>> KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>>> DefaultDecoder](
>>>   ssc,
>>>   kafkaParameter(p),
>>>   Map(topicsArr(p) -> 1),
>>>   StorageLevel.MEMORY_ONLY_SER
>>> ).map(_._2)
>>>   }
>>>   val unifiedStream = ssc.union(streams)
>>>   unifiedStream.repartition(1)
>>> }
>>> kafkaStreams.foreachRDD(rdd => {
>>>   rdd.foreachPartition(partitionOfRecords => {
>>> partitionOfRecords.foreach ( x =>
>>>   println(x)
>>> )
>>>   })
>>> })
>>
>>
>> So far, I'm able to get the data from several topic. However, I'm still
>> unable to
>> differentiate the data sent from a topic with another.
>>
>> Do anybody has an experience in doing this stuff?
>>
>> Best,
>> Imre
>>
>
>


-- 
Thanks,
Saurabh

:)


Re: Streaming app consume multiple kafka topics

2016-03-15 Thread Imre Nagi
Actually, I have tried your suggestion but it seems not working. Let me try
it once again.

Thanks for your help
Best,
Imre

On Tue, Mar 15, 2016 at 1:52 PM, Akhil Das 
wrote:

> One way would be to keep it this way:
>
> val stream1 = KafkaUtils.createStream(..) // for topic 1
>
> val stream2 = KafkaUtils.createStream(..) // for topic 2
>
>
> And you will know which stream belongs to which topic.
>
> Another approach which you can put in your code itself would be to tag the
> topic name along with the stream that you are creating. Like, create a
> tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
> the stream.
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi 
> wrote:
>
>> Hi,
>>
>> I'm just trying to create a spark streaming application that consumes
>> more than one topics sent by kafka. Then, I want to do different further
>> processing for data sent by each topic.
>>
>> val kafkaStreams = {
>>>   val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>>> Map(
>>>   "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>>   "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>>   "group.id" -> consumerGroup,
>>>   "zookeeper.connection.timeout.ms" ->
>>> ConsumerConfig.zookeeperConnectionTimeout,
>>>   "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>>   "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>> )
>>>   }
>>>   val streams = (0 to kafkaParameter.length - 1) map { p =>
>>> KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>>> DefaultDecoder](
>>>   ssc,
>>>   kafkaParameter(p),
>>>   Map(topicsArr(p) -> 1),
>>>   StorageLevel.MEMORY_ONLY_SER
>>> ).map(_._2)
>>>   }
>>>   val unifiedStream = ssc.union(streams)
>>>   unifiedStream.repartition(1)
>>> }
>>> kafkaStreams.foreachRDD(rdd => {
>>>   rdd.foreachPartition(partitionOfRecords => {
>>> partitionOfRecords.foreach ( x =>
>>>   println(x)
>>> )
>>>   })
>>> })
>>
>>
>> So far, I'm able to get the data from several topic. However, I'm still
>> unable to
>> differentiate the data sent from a topic with another.
>>
>> Do anybody has an experience in doing this stuff?
>>
>> Best,
>> Imre
>>
>
>


Re: Streaming app consume multiple kafka topics

2016-03-15 Thread Akhil Das
One way would be to keep it this way:

val stream1 = KafkaUtils.createStream(..) // for topic 1

val stream2 = KafkaUtils.createStream(..) // for topic 2


And you will know which stream belongs to which topic.

Another approach which you can put in your code itself would be to tag the
topic name along with the stream that you are creating. Like, create a
tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
the stream.


Thanks
Best Regards

On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi  wrote:

> Hi,
>
> I'm just trying to create a spark streaming application that consumes more
> than one topics sent by kafka. Then, I want to do different further
> processing for data sent by each topic.
>
> val kafkaStreams = {
>>   val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>> Map(
>>   "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>   "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>   "group.id" -> consumerGroup,
>>   "zookeeper.connection.timeout.ms" ->
>> ConsumerConfig.zookeeperConnectionTimeout,
>>   "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>   "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>> )
>>   }
>>   val streams = (0 to kafkaParameter.length - 1) map { p =>
>> KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>> DefaultDecoder](
>>   ssc,
>>   kafkaParameter(p),
>>   Map(topicsArr(p) -> 1),
>>   StorageLevel.MEMORY_ONLY_SER
>> ).map(_._2)
>>   }
>>   val unifiedStream = ssc.union(streams)
>>   unifiedStream.repartition(1)
>> }
>> kafkaStreams.foreachRDD(rdd => {
>>   rdd.foreachPartition(partitionOfRecords => {
>> partitionOfRecords.foreach ( x =>
>>   println(x)
>> )
>>   })
>> })
>
>
> So far, I'm able to get the data from several topic. However, I'm still
> unable to
> differentiate the data sent from a topic with another.
>
> Do anybody has an experience in doing this stuff?
>
> Best,
> Imre
>


Streaming app consume multiple kafka topics

2016-03-15 Thread Imre Nagi
Hi,

I'm just trying to create a spark streaming application that consumes more
than one topics sent by kafka. Then, I want to do different further
processing for data sent by each topic.

val kafkaStreams = {
>   val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
> Map(
>   "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>   "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>   "group.id" -> consumerGroup,
>   "zookeeper.connection.timeout.ms" ->
> ConsumerConfig.zookeeperConnectionTimeout,
>   "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>   "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
> )
>   }
>   val streams = (0 to kafkaParameter.length - 1) map { p =>
> KafkaUtils.createStream[String, Array[Byte], StringDecoder,
> DefaultDecoder](
>   ssc,
>   kafkaParameter(p),
>   Map(topicsArr(p) -> 1),
>   StorageLevel.MEMORY_ONLY_SER
> ).map(_._2)
>   }
>   val unifiedStream = ssc.union(streams)
>   unifiedStream.repartition(1)
> }
> kafkaStreams.foreachRDD(rdd => {
>   rdd.foreachPartition(partitionOfRecords => {
> partitionOfRecords.foreach ( x =>
>   println(x)
> )
>   })
> })


So far, I'm able to get the data from several topic. However, I'm still
unable to
differentiate the data sent from a topic with another.

Do anybody has an experience in doing this stuff?

Best,
Imre