Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStream<String, String> inputDStream() {

    HashSet<String> topicsSet = new 
HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);

    return KafkaUtils.createDirectStream(
            streamingContext,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
    );

}

I catch the messages using :JavaDStream<String> messages = inputDStream.map(new 
Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
});
My problem is, each of these Kafka Topics stream in different message types. 
How do I distinguish messages that are of type1, messages that are of type2, 
..... ?
I tried the following:
private class ParseEvents<T> implements Function<String, T> {
    final Class<T> parameterClass;

    private ParseEvents(Class<T> parameterClass) {
        this.parameterClass = parameterClass;
    }

    public T call(String message) throws Exception {
        ObjectMapper mapper = new ObjectMapper();

        T parsedMessage = null;

            try {
                parsedMessage = mapper.readValue(message, this.parameterClass);
            } catch (Exception e1) {
                logger.error("Ignoring Unknown Message %s", message);
              
            }
        return parsedMessage;
    }
}JavaDStream<Type1> type1Events = messages.map(new 
ParseEvents<Type1>(Type1.class));JavaDStream<Type2> type2Events = 
messages.map(new ParseEvents<Type2>(Type2.class));JavaDStream<Type3> 
type3Events = messages.map(new ParseEvents<Type3>(Type3.class));
But this does not work because type1 catches type2 messages and ignores them. 
Is there a clean way of handling this ?


Reply via email to