Fred, You are right, only default serdes provided through configs are auto-configured today. But we could auto-configure other serdes passed along side the topology builder as well. Do you want to file a JIRA to keep track of this?
Guozhang On Wed, May 18, 2016 at 11:36 AM, Fred Patton <thoughtp...@gmail.com> wrote: > Thanks, so much, Guozhang. It didn't work immediately when I tried end of > day, so I deferred til this morning. It seems I also needed to invoke > configure manually, or pass the props to the deserializer constructor. I am > providing the revised example in case it helps any one. Thank you Liquan > for pointing out the need for the cast, which was also the case. > > Regards, > Fred > > def main(args: Array[String]) { > > val schemaRegistry = new CachedSchemaRegistryClient(" > http://localhost:8081", > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > val builder: KStreamBuilder = new KStreamBuilder > > val streamingConfig = { > > val settings = new Properties > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092") > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181") > > settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://localhost:8081"); > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > settings.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > "true") > > settings > > } > > > import KeyValueImplicits._ > > > val stringSerializer = new StringSerializer > > val stringDeserializer = new StringDeserializer > > val bcAccessLogSerializer = new > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > val bcAccessLogDeserializer = new > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > > bcAccessLogSerializer.configure(streamingConfig.asInstanceOf[java.util.Map[String, > String]], false) > > > > bcAccessLogDeserializer.configure(streamingConfig.asInstanceOf[java.util.Map[String, > String]], false) > > > val rawSourceStream: KStream[String, String] = > builder.stream(stringDeserializer, > > stringDeserializer, > > "raw-accesslog-" + testRun) > > > val filteredStream: KStream[String, BcAccessLog] = > rawSourceStream.mapValues(parseAsJson) > > .filter(isCdcEvent) > > .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value))) > > > filteredStream > > .through("accesslog-" + testRun, > > stringSerializer, > > bcAccessLogSerializer, > > stringDeserializer, > > bcAccessLogDeserializer) > > .mapValues(value => { > > println("filteredSourceStream value: " + value) > > value > > }) > > .process(new CdcProcessorSupplier) > > > val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig) > > println("\nstart filtering BcAccessLog test run: " + testRun + "\n") > > stream.start() > > } > > On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Fred, > > > > Thanks for reporting this. I looked through your code and found this is > due > > to a bug in io.confluent.examples.streams.utils.SpecificAvroDeserializer. > > > > I will fix this bug in Confluent's repo, as for you to work around it, > you > > can create your own SpecificAvroDeserializer with the correct logic by > > copy-past Confluent class with the following one-line change: > > > > In configure(Map<String, ?> configs, boolean isKey) function, add the > > following key-value pair to the configs before calling inner.configure: > > > > configs.put("specific.avro.reader", "true"); > > > > > > Guozhang > > > > On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com> > > wrote: > > > > > Before, switching the example from to() to through() things seemed to > > work > > > fine. Afterward, I get ClassCastExceptions from GenericRecord to > > > SpecificRecord. Hopefully, someone can point out something quick and > dumb > > > so I can get my demo wrapped up. I never get past the through() > method. I > > > am using the Kafka Streams tech preview, and using Avro compiled from > > > AvroTools. Additionally, initially I had a second parallel application > > > taking the output of this to() invocation and continuing with the same > > > logic, there was no crash, but no code ever ran such as initializing > the > > > context. Any illumination is most appreciated. > > > > > > Here's the stack trace followed by the offending bit code... > > > > > > [info] CdcProcessorSupplier::get > > > [info] CdcProcessor::init - ctx: > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552 > > > [info] CdcProcessor::close > > > [error] Exception in thread "StreamThread-1" > > java.lang.ClassCastException: > > > org.apache.avro.generic.GenericData$Record cannot be cast to > > > org.apache.avro.specific.SpecificRecord > > > [error] at > > > > > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51) > > > [error] at > > > > > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24) > > > [error] at > > > > > > > > > org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41) > > > [error] at > > > > > > > > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77) > > > [error] at > > > > > > > > > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113) > > > [error] at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134) > > > [error] at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334) > > > [error] at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) > > > > > > def main(args: Array[String]) { > > > val schemaRegistry = new CachedSchemaRegistryClient(" > > > http://localhost:8081", > > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > > val builder: KStreamBuilder = new KStreamBuilder > > > val streamingConfig = { > > > val settings = new Properties > > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092") > > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > > "localhost:2181") > > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > settings > > > } > > > > > > import KeyValueImplicits._ > > > > > > val stringSerializer = new StringSerializer > > > val stringDeserializer = new StringDeserializer > > > val bcAccessLogSerializer = new > > > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > > val bcAccessLogDeserializer = new > > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > > > val rawSourceStream: KStream[String, String] = > > > builder.stream(stringDeserializer, > > > stringDeserializer, > > > "raw-accesslog-" + testRun) > > > > > > val filteredStream: KStream[String, BcAccessLog] = > > > rawSourceStream.mapValues(parseAsJson) > > > .filter(isCdcEvent) > > > .map((key, value) => (getAccessLogKey(value), > > toBcAccessLog(value))) > > > > > > filteredStream > > > .through("accesslog-" + testRun, > > > stringSerializer, > > > bcAccessLogSerializer, > > > stringDeserializer, > > > bcAccessLogDeserializer) > > > .mapValues(value => { > > > println("filteredSourceStream value: " + value) > > > value > > > }) > > > .process(new CdcProcessorSupplier) > > > > > > val stream: KafkaStreams = new KafkaStreams(builder, > streamingConfig) > > > println("\nstart filtering BcAccessLog test run: " + testRun + > "\n") > > > stream.start() > > > } > > > > > > Regards, > > > Fred Patton > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang