Each spark partition will contain messages only from a single kafka topcipartition. Use hasOffsetRanges to tell which kafka topicpartition it's from. See the docs http://spark.apache.org/docs/latest/streaming-kafka-integration.html
On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast <sparkenthusi...@yahoo.in > wrote: > Folks, > > I use the following Streaming API from KafkaUtils : > > public JavaPairInputDStream<String, String> inputDStream() { > > HashSet<String> topicsSet = new > HashSet<String>(Arrays.asList(topics.split(","))); > HashMap<String, String> kafkaParams = new HashMap<String, String>(); > kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), > brokers); > > return KafkaUtils.createDirectStream( > streamingContext, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicsSet > ); > > } > > > I catch the messages using : > > JavaDStream<String> messages = inputDStream.map(new Function<Tuple2<String, > String>, String>() { > @Override > public String call(Tuple2<String, String> tuple2) { > return tuple2._2(); > } > }); > > > My problem is, each of these Kafka Topics stream in different message types. > How do I distinguish messages that are of type1, messages that are of type2, > ..... ? > > > I tried the following: > > > private class ParseEvents<T> implements Function<String, T> { > final Class<T> parameterClass; > > private ParseEvents(Class<T> parameterClass) { > this.parameterClass = parameterClass; > } > > public T call(String message) throws Exception { > ObjectMapper mapper = new ObjectMapper(); > > T parsedMessage = null; > > try { > parsedMessage = mapper.readValue(message, > this.parameterClass); > } catch (Exception e1) { > logger.error("Ignoring Unknown Message %s", message); > > } > return parsedMessage; > } > } > > JavaDStream<Type1> type1Events = messages.map(new > ParseEvents<Type1>(Type1.class)); > > JavaDStream<Type2> type2Events = messages.map(new > ParseEvents<Type2>(Type2.class)); > > JavaDStream<Type3> type3Events = messages.map(new > ParseEvents<Type3>(Type3.class)); > > > But this does not work because type1 catches type2 messages and ignores them. > Is there a clean way of handling this ? > > > > >