I'm trying to integrate with schemaRegistry and SparkStreaming. By the
moment I want to use GenericRecords. It seems that my producer works and
new schemas are published in _schemas topic. When I try to read with my
Consumer, I'm not able to deserialize the data.

How could I say to Spark that I'm going to deserializer to GenericRecord?



public class SparkStreamingSchemaRegister {

    public static void main(String[] args) throws InterruptedException {
        String topic = "avro_example_schemaRegistry";

        final JavaStreamingContext jssc = new
JavaStreamingContext(getSparkConf(),

Durations.milliseconds(Constants.STREAM_BATCH_DURATION_MILLIS));


        final JavaInputDStream<ConsumerRecord<byte[], *GenericRecord*>>
rawStream = KafkaSource.getKafkaDirectStream(jssc);

        rawStream.foreachRDD(rdd -> {
            JavaRDD<Client> javaRddClient = rdd.map(
                    kafkaRecord -> {

                        GenericRecord record = kafkaRecord.value(); -->
ERROR
                        return CrmClient.getCrmClient(kafkaRecord.value());
                    });


           CassandraJavaUtil
                    .javaFunctions(javaRddClient)
                    .writerBuilder("keyspace", "client",
CassandraJavaUtil.mapToRow(CrmClient.class))
                    .withColumnSelector(CassandraJavaUtil.someColumns("id",
"name", "lastname"))
                    .saveToCassandra();
        });


        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }


    private static class KafkaSource {
        public static JavaInputDStream<ConsumerRecord<byte[],
*GenericRecord*>> getKafkaDirectStream(JavaStreamingContext jssc) {
            JavaInputDStream<ConsumerRecord<byte[], *GenericRecord*>>
stream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<byte[],
*GenericRecord*>Subscribe(getKafkaTopic(),
getKafkaConf()));
            return stream;
        }


        private static Map<String, Object> getKafkaConf() {
            Map<String, Object> kafkaParams = new HashMap<>();

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_BOOTSTRAP_SERVERS.getValue(),
Constants.KAFKA_BOOTSTRAP_SERVERS);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_KEY_DESERIALIZER.getValue(),
ByteArrayDeserializer.class);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_GROUPID.getValue(),
Constants.KAFKA_GROUP_ID);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_ENABLE_AUTO_COMMIT.getValue(),
false);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_AUTO_OFFSET_RESET.getValue(),
"earliest");

* 
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class.getName());*

kafkaParams.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
"false");

kafkaParams.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"
http://localhost:8081";);

            return kafkaParams;
        }

    }
}

Reply via email to