Re: How to parse multiple event types using Kafka

2015-08-23 Thread Cody Koeninger
Each spark partition will contain messages only from a single kafka
topcipartition.  Use hasOffsetRanges to tell which kafka topicpartition
it's from.  See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast sparkenthusi...@yahoo.in
 wrote:

 Folks,

 I use the following Streaming API from KafkaUtils :

 public JavaPairInputDStreamString, String inputDStream() {

 HashSetString topicsSet = new 
 HashSetString(Arrays.asList(topics.split(,)));
 HashMapString, String kafkaParams = new HashMapString, String();
 kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), 
 brokers);

 return KafkaUtils.createDirectStream(
 streamingContext,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicsSet
 );

 }


 I catch the messages using :

 JavaDStreamString messages = inputDStream.map(new FunctionTuple2String, 
 String, String() {
 @Override
 public String call(Tuple2String, String tuple2) {
 return tuple2._2();
 }
 });


 My problem is, each of these Kafka Topics stream in different message types. 
 How do I distinguish messages that are of type1, messages that are of type2, 
 . ?


 I tried the following:


 private class ParseEventsT implements FunctionString, T {
 final ClassT parameterClass;

 private ParseEvents(ClassT parameterClass) {
 this.parameterClass = parameterClass;
 }

 public T call(String message) throws Exception {
 ObjectMapper mapper = new ObjectMapper();

 T parsedMessage = null;

 try {
 parsedMessage = mapper.readValue(message, 
 this.parameterClass);
 } catch (Exception e1) {
 logger.error(Ignoring Unknown Message %s, message);

 }
 return parsedMessage;
 }
 }

 JavaDStreamType1 type1Events = messages.map(new 
 ParseEventsType1(Type1.class));

 JavaDStreamType2 type2Events = messages.map(new 
 ParseEventsType2(Type2.class));

 JavaDStreamType3 type3Events = messages.map(new 
 ParseEventsType3(Type3.class));


 But this does not work because type1 catches type2 messages and ignores them. 
 Is there a clean way of handling this ?







How to parse multiple event types using Kafka

2015-08-23 Thread Spark Enthusiast
Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStreamString, String inputDStream() {

HashSetString topicsSet = new 
HashSetString(Arrays.asList(topics.split(,)));
HashMapString, String kafkaParams = new HashMapString, String();
kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);

return KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

}

I catch the messages using :JavaDStreamString messages = inputDStream.map(new 
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});
My problem is, each of these Kafka Topics stream in different message types. 
How do I distinguish messages that are of type1, messages that are of type2, 
. ?
I tried the following:
private class ParseEventsT implements FunctionString, T {
final ClassT parameterClass;

private ParseEvents(ClassT parameterClass) {
this.parameterClass = parameterClass;
}

public T call(String message) throws Exception {
ObjectMapper mapper = new ObjectMapper();

T parsedMessage = null;

try {
parsedMessage = mapper.readValue(message, this.parameterClass);
} catch (Exception e1) {
logger.error(Ignoring Unknown Message %s, message);
  
}
return parsedMessage;
}
}JavaDStreamType1 type1Events = messages.map(new 
ParseEventsType1(Type1.class));JavaDStreamType2 type2Events = 
messages.map(new ParseEventsType2(Type2.class));JavaDStreamType3 
type3Events = messages.map(new ParseEventsType3(Type3.class));
But this does not work because type1 catches type2 messages and ignores them. 
Is there a clean way of handling this ?