Folks, I use the following Streaming API from KafkaUtils : public JavaPairInputDStream<String, String> inputDStream() {
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers); return KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); } I catch the messages using :JavaDStream<String> messages = inputDStream.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, ..... ? I tried the following: private class ParseEvents<T> implements Function<String, T> { final Class<T> parameterClass; private ParseEvents(Class<T> parameterClass) { this.parameterClass = parameterClass; } public T call(String message) throws Exception { ObjectMapper mapper = new ObjectMapper(); T parsedMessage = null; try { parsedMessage = mapper.readValue(message, this.parameterClass); } catch (Exception e1) { logger.error("Ignoring Unknown Message %s", message); } return parsedMessage; } }JavaDStream<Type1> type1Events = messages.map(new ParseEvents<Type1>(Type1.class));JavaDStream<Type2> type2Events = messages.map(new ParseEvents<Type2>(Type2.class));JavaDStream<Type3> type3Events = messages.map(new ParseEvents<Type3>(Type3.class)); But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?