Hi, I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 streaming API. Code fragments follow. -- Nick
JavaInputDStream<ConsumerRecord<String, byte[]>> rawStream = getDirectKafkaStream(); JavaDStream<Tuple2<String, byte[]>> messagesTuple = rawStream.map( new Function<ConsumerRecord<String, byte[]>, Tuple2<String, byte[]>>() { @Override public Tuple2<String, byte[]> call(ConsumerRecord<String, byte[]> record) { final String hyphen = "-"; final String topicPartition = record.partition() + hyphen + record.offset(); return new Tuple2<>(topicPartition, record.value()); } } ); messagesTuple.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, byte[]>>>() { @Override public void call(JavaRDD<Tuple2<String, byte[]>> rdd) throws Exception { List<Tuple2<String, byte[]>> list = rdd.take(10); for (Tuple2<String, byte[]> pair : list) { log.info("messages tuple key: " + pair._1() + " : " + pair._2()); } } } ); The above foreachRDD logs output correctly. 17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: -13-231599504 : �2017-03-22 15:54:05.568628����$�g� ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�����Vwin��@1.0.1703.0Unlabeled Stable�8���Not ApplicableNot ApplicableNot ApplicabledayMR_Day01Empty�<<<BBBBBB@@@ However, when invoking mapPartitionsToPair on messagesTuple a CastException results when accessing the 2nd element of the pair. messagesTuple.mapPartitionsToPair(new RecordFlatMapPairPartitionFunction2( outputDirectory, schemaServiceUrl, product, env, batchId, typeMap, avroSchemaMap, avroSchemaAcc)); 17/03/22 15:57:02 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-10-247-0-141.ec2.internal, executor 1): java.lang.ClassCastException: java.lang.String cannot be cast to [B at com.wb.analytics.spark.services.functions.RecordFlatMapPairPartitionFunction2.call(RecordFlatMapPairPartitionFunction2.java:113) public class RecordFlatMapPairPartitionFunction2 implements PairFlatMapFunction<Iterator<Tuple2<String, byte[]>>, String, String> { ... @Override public Iterator<Tuple2<String, String>> call(Iterator<Tuple2<String, byte[]>> messages) throws Exception { while (messages.hasNext()) { Tuple2<String, byte[]> record = messages.next(); String topicPartitionOffset = record._1(); byte[] val = record._2(); // Line 113 <<<<<<<<<<<<<<<<<<<<<<<<<<< ClassCastException ...