Each spark partition will contain messages only from a single kafka
topcipartition. Use hasOffsetRanges to tell which kafka topicpartition
it's from. See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast sparkenthusi...@yahoo.in
wrote:
Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStreamString, String inputDStream() {
HashSetString topicsSet = new
HashSetString(Arrays.asList(topics.split(,)));
HashMapString, String kafkaParams = new HashMapString, String();
kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(),
brokers);
return KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
}
I catch the messages using :
JavaDStreamString messages = inputDStream.map(new FunctionTuple2String,
String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});
My problem is, each of these Kafka Topics stream in different message types.
How do I distinguish messages that are of type1, messages that are of type2,
. ?
I tried the following:
private class ParseEventsT implements FunctionString, T {
final ClassT parameterClass;
private ParseEvents(ClassT parameterClass) {
this.parameterClass = parameterClass;
}
public T call(String message) throws Exception {
ObjectMapper mapper = new ObjectMapper();
T parsedMessage = null;
try {
parsedMessage = mapper.readValue(message,
this.parameterClass);
} catch (Exception e1) {
logger.error(Ignoring Unknown Message %s, message);
}
return parsedMessage;
}
}
JavaDStreamType1 type1Events = messages.map(new
ParseEventsType1(Type1.class));
JavaDStreamType2 type2Events = messages.map(new
ParseEventsType2(Type2.class));
JavaDStreamType3 type3Events = messages.map(new
ParseEventsType3(Type3.class));
But this does not work because type1 catches type2 messages and ignores them.
Is there a clean way of handling this ?