Fred,

You are right, only default serdes provided through configs are
auto-configured today. But we could auto-configure other serdes passed
along side the topology builder as well. Do you want to file a JIRA to keep
track of this?


Guozhang

On Wed, May 18, 2016 at 11:36 AM, Fred Patton <thoughtp...@gmail.com> wrote:

> Thanks, so much, Guozhang. It didn't work immediately when I tried end of
> day, so I deferred til this morning. It seems I also needed to invoke
> configure manually, or pass the props to the deserializer constructor. I am
> providing the revised example in case it helps any one. Thank you Liquan
> for pointing out the need for the cast, which was also the case.
>
> Regards,
> Fred
>
>   def main(args: Array[String]) {
>
>     val schemaRegistry = new CachedSchemaRegistryClient("
> http://localhost:8081";,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
>
>     val builder: KStreamBuilder = new KStreamBuilder
>
>     val streamingConfig = {
>
>       val settings = new Properties
>
>       settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
>
>       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>
>       settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181")
>
>       settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://localhost:8081";);
>
>       settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroSerializer")
>
>       settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>
>       settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroSerializer")
>
>       settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>
>       settings.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> "true")
>
>       settings
>
>     }
>
>
>     import KeyValueImplicits._
>
>
>     val stringSerializer = new StringSerializer
>
>     val stringDeserializer = new StringDeserializer
>
>     val bcAccessLogSerializer = new
> SpecificAvroSerializer[BcAccessLog](schemaRegistry)
>
>     val bcAccessLogDeserializer = new
> SpecificAvroDeserializer[BcAccessLog](schemaRegistry)
>
>
>
>
> bcAccessLogSerializer.configure(streamingConfig.asInstanceOf[java.util.Map[String,
> String]], false)
>
>
>
> bcAccessLogDeserializer.configure(streamingConfig.asInstanceOf[java.util.Map[String,
> String]], false)
>
>
>     val rawSourceStream: KStream[String, String] =
> builder.stream(stringDeserializer,
>
>       stringDeserializer,
>
>       "raw-accesslog-" + testRun)
>
>
>     val filteredStream: KStream[String, BcAccessLog] =
> rawSourceStream.mapValues(parseAsJson)
>
>       .filter(isCdcEvent)
>
>       .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value)))
>
>
>     filteredStream
>
>       .through("accesslog-" + testRun,
>
>         stringSerializer,
>
>         bcAccessLogSerializer,
>
>         stringDeserializer,
>
>         bcAccessLogDeserializer)
>
>       .mapValues(value => {
>
>         println("filteredSourceStream value: " + value)
>
>         value
>
>       })
>
>       .process(new CdcProcessorSupplier)
>
>
>     val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
>
>     println("\nstart filtering BcAccessLog test run: " + testRun + "\n")
>
>     stream.start()
>
>   }
>
> On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Fred,
> >
> > Thanks for reporting this. I looked through your code and found this is
> due
> > to a bug in io.confluent.examples.streams.utils.SpecificAvroDeserializer.
> >
> > I will fix this bug in Confluent's repo, as for you to work around it,
> you
> > can create your own SpecificAvroDeserializer with the correct logic by
> > copy-past Confluent class with the following one-line change:
> >
> > In configure(Map<String, ?> configs, boolean isKey) function, add the
> > following key-value pair to the configs before calling inner.configure:
> >
> > configs.put("specific.avro.reader", "true");
> >
> >
> > Guozhang
> >
> > On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com>
> > wrote:
> >
> > > Before, switching the example from to() to through() things seemed to
> > work
> > > fine. Afterward, I get ClassCastExceptions from GenericRecord to
> > > SpecificRecord. Hopefully, someone can point out something quick and
> dumb
> > > so I can get my demo wrapped up. I never get past the through()
> method. I
> > > am using the Kafka Streams tech preview, and using Avro compiled from
> > > AvroTools. Additionally, initially I had a second parallel application
> > > taking the output of this to() invocation and continuing with the same
> > > logic, there was no crash, but no code ever ran such as initializing
> the
> > > context. Any illumination is most appreciated.
> > >
> > > Here's the stack trace followed by the offending bit code...
> > >
> > > [info] CdcProcessorSupplier::get
> > > [info] CdcProcessor::init - ctx:
> > >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552
> > > [info] CdcProcessor::close
> > > [error] Exception in thread "StreamThread-1"
> > java.lang.ClassCastException:
> > > org.apache.avro.generic.GenericData$Record cannot be cast to
> > > org.apache.avro.specific.SpecificRecord
> > > [error] at
> > >
> > >
> >
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51)
> > > [error] at
> > >
> > >
> >
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24)
> > > [error] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41)
> > > [error] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77)
> > > [error] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113)
> > > [error] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134)
> > > [error] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334)
> > > [error] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
> > >
> > >   def main(args: Array[String]) {
> > >     val schemaRegistry = new CachedSchemaRegistryClient("
> > > http://localhost:8081";,
> > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
> > >     val builder: KStreamBuilder = new KStreamBuilder
> > >     val streamingConfig = {
> > >       val settings = new Properties
> > >       settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
> > >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092")
> > >       settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > > "localhost:2181")
> > >       settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> > >       settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> > >       settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> > >       settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> > >       settings
> > >     }
> > >
> > >     import KeyValueImplicits._
> > >
> > >     val stringSerializer = new StringSerializer
> > >     val stringDeserializer = new StringDeserializer
> > >     val bcAccessLogSerializer = new
> > > SpecificAvroSerializer[BcAccessLog](schemaRegistry)
> > >     val bcAccessLogDeserializer = new
> > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry)
> > >
> > >     val rawSourceStream: KStream[String, String] =
> > > builder.stream(stringDeserializer,
> > >       stringDeserializer,
> > >       "raw-accesslog-" + testRun)
> > >
> > >     val filteredStream: KStream[String, BcAccessLog] =
> > > rawSourceStream.mapValues(parseAsJson)
> > >       .filter(isCdcEvent)
> > >       .map((key, value) => (getAccessLogKey(value),
> > toBcAccessLog(value)))
> > >
> > >     filteredStream
> > >       .through("accesslog-" + testRun,
> > >                stringSerializer,
> > >                bcAccessLogSerializer,
> > >                stringDeserializer,
> > >                bcAccessLogDeserializer)
> > >       .mapValues(value => {
> > >                    println("filteredSourceStream value: " + value)
> > >                    value
> > >                  })
> > >       .process(new CdcProcessorSupplier)
> > >
> > >     val stream: KafkaStreams = new KafkaStreams(builder,
> streamingConfig)
> > >     println("\nstart filtering BcAccessLog test run: " + testRun +
> "\n")
> > >     stream.start()
> > >   }
> > >
> > > Regards,
> > > Fred Patton
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to