Thanks! On Wed, May 18, 2016 at 12:38 PM, Fred Patton <thoughtp...@gmail.com> wrote:
> Thanks, Guozhang, I'll file a JIRA this afternoon. --Fred > > On Wed, May 18, 2016 at 11:51 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Fred, > > > > You are right, only default serdes provided through configs are > > auto-configured today. But we could auto-configure other serdes passed > > along side the topology builder as well. Do you want to file a JIRA to > keep > > track of this? > > > > > > Guozhang > > > > On Wed, May 18, 2016 at 11:36 AM, Fred Patton <thoughtp...@gmail.com> > > wrote: > > > > > Thanks, so much, Guozhang. It didn't work immediately when I tried end > of > > > day, so I deferred til this morning. It seems I also needed to invoke > > > configure manually, or pass the props to the deserializer constructor. > I > > am > > > providing the revised example in case it helps any one. Thank you > Liquan > > > for pointing out the need for the cast, which was also the case. > > > > > > Regards, > > > Fred > > > > > > def main(args: Array[String]) { > > > > > > val schemaRegistry = new CachedSchemaRegistryClient(" > > > http://localhost:8081", > > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > > > > > val builder: KStreamBuilder = new KStreamBuilder > > > > > > val streamingConfig = { > > > > > > val settings = new Properties > > > > > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > > > > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092") > > > > > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > > "localhost:2181") > > > > > > > > settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > > > "http://localhost:8081"); > > > > > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > > > > > settings.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > > > "true") > > > > > > settings > > > > > > } > > > > > > > > > import KeyValueImplicits._ > > > > > > > > > val stringSerializer = new StringSerializer > > > > > > val stringDeserializer = new StringDeserializer > > > > > > val bcAccessLogSerializer = new > > > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > > > > > val bcAccessLogDeserializer = new > > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > > > > > > > > > > > > > > > bcAccessLogSerializer.configure(streamingConfig.asInstanceOf[java.util.Map[String, > > > String]], false) > > > > > > > > > > > > > > > bcAccessLogDeserializer.configure(streamingConfig.asInstanceOf[java.util.Map[String, > > > String]], false) > > > > > > > > > val rawSourceStream: KStream[String, String] = > > > builder.stream(stringDeserializer, > > > > > > stringDeserializer, > > > > > > "raw-accesslog-" + testRun) > > > > > > > > > val filteredStream: KStream[String, BcAccessLog] = > > > rawSourceStream.mapValues(parseAsJson) > > > > > > .filter(isCdcEvent) > > > > > > .map((key, value) => (getAccessLogKey(value), > > toBcAccessLog(value))) > > > > > > > > > filteredStream > > > > > > .through("accesslog-" + testRun, > > > > > > stringSerializer, > > > > > > bcAccessLogSerializer, > > > > > > stringDeserializer, > > > > > > bcAccessLogDeserializer) > > > > > > .mapValues(value => { > > > > > > println("filteredSourceStream value: " + value) > > > > > > value > > > > > > }) > > > > > > .process(new CdcProcessorSupplier) > > > > > > > > > val stream: KafkaStreams = new KafkaStreams(builder, > streamingConfig) > > > > > > println("\nstart filtering BcAccessLog test run: " + testRun + > "\n") > > > > > > stream.start() > > > > > > } > > > > > > On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Fred, > > > > > > > > Thanks for reporting this. I looked through your code and found this > is > > > due > > > > to a bug in > > io.confluent.examples.streams.utils.SpecificAvroDeserializer. > > > > > > > > I will fix this bug in Confluent's repo, as for you to work around > it, > > > you > > > > can create your own SpecificAvroDeserializer with the correct logic > by > > > > copy-past Confluent class with the following one-line change: > > > > > > > > In configure(Map<String, ?> configs, boolean isKey) function, add the > > > > following key-value pair to the configs before calling > inner.configure: > > > > > > > > configs.put("specific.avro.reader", "true"); > > > > > > > > > > > > Guozhang > > > > > > > > On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com > > > > > > wrote: > > > > > > > > > Before, switching the example from to() to through() things seemed > to > > > > work > > > > > fine. Afterward, I get ClassCastExceptions from GenericRecord to > > > > > SpecificRecord. Hopefully, someone can point out something quick > and > > > dumb > > > > > so I can get my demo wrapped up. I never get past the through() > > > method. I > > > > > am using the Kafka Streams tech preview, and using Avro compiled > from > > > > > AvroTools. Additionally, initially I had a second parallel > > application > > > > > taking the output of this to() invocation and continuing with the > > same > > > > > logic, there was no crash, but no code ever ran such as > initializing > > > the > > > > > context. Any illumination is most appreciated. > > > > > > > > > > Here's the stack trace followed by the offending bit code... > > > > > > > > > > [info] CdcProcessorSupplier::get > > > > > [info] CdcProcessor::init - ctx: > > > > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552 > > > > > [info] CdcProcessor::close > > > > > [error] Exception in thread "StreamThread-1" > > > > java.lang.ClassCastException: > > > > > org.apache.avro.generic.GenericData$Record cannot be cast to > > > > > org.apache.avro.specific.SpecificRecord > > > > > [error] at > > > > > > > > > > > > > > > > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334) > > > > > [error] at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) > > > > > > > > > > def main(args: Array[String]) { > > > > > val schemaRegistry = new CachedSchemaRegistryClient(" > > > > > http://localhost:8081", > > > > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > > > > val builder: KStreamBuilder = new KStreamBuilder > > > > > val streamingConfig = { > > > > > val settings = new Properties > > > > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > > > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > > > "localhost:9092") > > > > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > > > > "localhost:2181") > > > > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > > settings > > > > > } > > > > > > > > > > import KeyValueImplicits._ > > > > > > > > > > val stringSerializer = new StringSerializer > > > > > val stringDeserializer = new StringDeserializer > > > > > val bcAccessLogSerializer = new > > > > > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > > > > val bcAccessLogDeserializer = new > > > > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > > > > > > > val rawSourceStream: KStream[String, String] = > > > > > builder.stream(stringDeserializer, > > > > > stringDeserializer, > > > > > "raw-accesslog-" + testRun) > > > > > > > > > > val filteredStream: KStream[String, BcAccessLog] = > > > > > rawSourceStream.mapValues(parseAsJson) > > > > > .filter(isCdcEvent) > > > > > .map((key, value) => (getAccessLogKey(value), > > > > toBcAccessLog(value))) > > > > > > > > > > filteredStream > > > > > .through("accesslog-" + testRun, > > > > > stringSerializer, > > > > > bcAccessLogSerializer, > > > > > stringDeserializer, > > > > > bcAccessLogDeserializer) > > > > > .mapValues(value => { > > > > > println("filteredSourceStream value: " + value) > > > > > value > > > > > }) > > > > > .process(new CdcProcessorSupplier) > > > > > > > > > > val stream: KafkaStreams = new KafkaStreams(builder, > > > streamingConfig) > > > > > println("\nstart filtering BcAccessLog test run: " + testRun + > > > "\n") > > > > > stream.start() > > > > > } > > > > > > > > > > Regards, > > > > > Fred Patton > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang