Hi,

I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 
streaming API.  Code fragments follow.
--
   Nick

       JavaInputDStream<ConsumerRecord<String, byte[]>> rawStream = 
getDirectKafkaStream();

       JavaDStream<Tuple2<String, byte[]>> messagesTuple = rawStream.map(
                new Function<ConsumerRecord<String, byte[]>, Tuple2<String, 
byte[]>>() {
                          @Override
                          public Tuple2<String, byte[]> 
call(ConsumerRecord<String, byte[]> record) {
                              final String hyphen = "-";
                              final String topicPartition = record.partition() 
+ hyphen + record.offset();

                              return new Tuple2<>(topicPartition, 
record.value());
                          }
                      }
        );

        messagesTuple.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, 
byte[]>>>() {
                                     @Override
                                     public void call(JavaRDD<Tuple2<String, 
byte[]>> rdd) throws Exception {
                                         List<Tuple2<String, byte[]>> list = 
rdd.take(10);

                                         for (Tuple2<String, byte[]> pair : 
list) {
                                             log.info("messages tuple key: " + 
pair._1() + " : " + pair._2());
                                         }
                                     }
                                 }
        );


The above foreachRDD logs output correctly.

17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: 
-13-231599504 : �2017-03-22 15:54:05.568628����$�g� 
ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�����Vwin��@1.0.1703.0Unlabeled
 Stable�8���Not ApplicableNot ApplicableNot 
ApplicabledayMR_Day01Empty�<<<BBBBBB@@@


However, when invoking mapPartitionsToPair on messagesTuple a CastException 
results when accessing the 2nd element of the pair.

      messagesTuple.mapPartitionsToPair(new RecordFlatMapPairPartitionFunction2(
                    outputDirectory, schemaServiceUrl, product, env, batchId, 
typeMap, avroSchemaMap, avroSchemaAcc));

17/03/22 15:57:02 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
ip-10-247-0-141.ec2.internal, executor 1): java.lang.ClassCastException: 
java.lang.String cannot be cast to [B
    at 
com.wb.analytics.spark.services.functions.RecordFlatMapPairPartitionFunction2.call(RecordFlatMapPairPartitionFunction2.java:113)



public class RecordFlatMapPairPartitionFunction2 implements
        PairFlatMapFunction<Iterator<Tuple2<String, byte[]>>, String, String> {
...

    @Override
    public Iterator<Tuple2<String, String>> call(Iterator<Tuple2<String, 
byte[]>> messages)
            throws Exception {

        while (messages.hasNext()) {
            Tuple2<String, byte[]> record = messages.next();
            String topicPartitionOffset = record._1();
            byte[] val = record._2();  // Line 113 <<<<<<<<<<<<<<<<<<<<<<<<<<< 
ClassCastException

       ...

Reply via email to