Thanks!

On Wed, May 18, 2016 at 12:38 PM, Fred Patton <thoughtp...@gmail.com> wrote:

> Thanks, Guozhang, I'll file a JIRA this afternoon. --Fred
>
> On Wed, May 18, 2016 at 11:51 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Fred,
> >
> > You are right, only default serdes provided through configs are
> > auto-configured today. But we could auto-configure other serdes passed
> > along side the topology builder as well. Do you want to file a JIRA to
> keep
> > track of this?
> >
> >
> > Guozhang
> >
> > On Wed, May 18, 2016 at 11:36 AM, Fred Patton <thoughtp...@gmail.com>
> > wrote:
> >
> > > Thanks, so much, Guozhang. It didn't work immediately when I tried end
> of
> > > day, so I deferred til this morning. It seems I also needed to invoke
> > > configure manually, or pass the props to the deserializer constructor.
> I
> > am
> > > providing the revised example in case it helps any one. Thank you
> Liquan
> > > for pointing out the need for the cast, which was also the case.
> > >
> > > Regards,
> > > Fred
> > >
> > >   def main(args: Array[String]) {
> > >
> > >     val schemaRegistry = new CachedSchemaRegistryClient("
> > > http://localhost:8081";,
> > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
> > >
> > >     val builder: KStreamBuilder = new KStreamBuilder
> > >
> > >     val streamingConfig = {
> > >
> > >       val settings = new Properties
> > >
> > >       settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
> > >
> > >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092")
> > >
> > >       settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > > "localhost:2181")
> > >
> > >
> >  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> > > "http://localhost:8081";);
> > >
> > >       settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> > >
> > >       settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> > >
> > >       settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> > >
> > >       settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> > >
> > >
> >  settings.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> > > "true")
> > >
> > >       settings
> > >
> > >     }
> > >
> > >
> > >     import KeyValueImplicits._
> > >
> > >
> > >     val stringSerializer = new StringSerializer
> > >
> > >     val stringDeserializer = new StringDeserializer
> > >
> > >     val bcAccessLogSerializer = new
> > > SpecificAvroSerializer[BcAccessLog](schemaRegistry)
> > >
> > >     val bcAccessLogDeserializer = new
> > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry)
> > >
> > >
> > >
> > >
> > >
> >
> bcAccessLogSerializer.configure(streamingConfig.asInstanceOf[java.util.Map[String,
> > > String]], false)
> > >
> > >
> > >
> > >
> >
> bcAccessLogDeserializer.configure(streamingConfig.asInstanceOf[java.util.Map[String,
> > > String]], false)
> > >
> > >
> > >     val rawSourceStream: KStream[String, String] =
> > > builder.stream(stringDeserializer,
> > >
> > >       stringDeserializer,
> > >
> > >       "raw-accesslog-" + testRun)
> > >
> > >
> > >     val filteredStream: KStream[String, BcAccessLog] =
> > > rawSourceStream.mapValues(parseAsJson)
> > >
> > >       .filter(isCdcEvent)
> > >
> > >       .map((key, value) => (getAccessLogKey(value),
> > toBcAccessLog(value)))
> > >
> > >
> > >     filteredStream
> > >
> > >       .through("accesslog-" + testRun,
> > >
> > >         stringSerializer,
> > >
> > >         bcAccessLogSerializer,
> > >
> > >         stringDeserializer,
> > >
> > >         bcAccessLogDeserializer)
> > >
> > >       .mapValues(value => {
> > >
> > >         println("filteredSourceStream value: " + value)
> > >
> > >         value
> > >
> > >       })
> > >
> > >       .process(new CdcProcessorSupplier)
> > >
> > >
> > >     val stream: KafkaStreams = new KafkaStreams(builder,
> streamingConfig)
> > >
> > >     println("\nstart filtering BcAccessLog test run: " + testRun +
> "\n")
> > >
> > >     stream.start()
> > >
> > >   }
> > >
> > > On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Fred,
> > > >
> > > > Thanks for reporting this. I looked through your code and found this
> is
> > > due
> > > > to a bug in
> > io.confluent.examples.streams.utils.SpecificAvroDeserializer.
> > > >
> > > > I will fix this bug in Confluent's repo, as for you to work around
> it,
> > > you
> > > > can create your own SpecificAvroDeserializer with the correct logic
> by
> > > > copy-past Confluent class with the following one-line change:
> > > >
> > > > In configure(Map<String, ?> configs, boolean isKey) function, add the
> > > > following key-value pair to the configs before calling
> inner.configure:
> > > >
> > > > configs.put("specific.avro.reader", "true");
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Before, switching the example from to() to through() things seemed
> to
> > > > work
> > > > > fine. Afterward, I get ClassCastExceptions from GenericRecord to
> > > > > SpecificRecord. Hopefully, someone can point out something quick
> and
> > > dumb
> > > > > so I can get my demo wrapped up. I never get past the through()
> > > method. I
> > > > > am using the Kafka Streams tech preview, and using Avro compiled
> from
> > > > > AvroTools. Additionally, initially I had a second parallel
> > application
> > > > > taking the output of this to() invocation and continuing with the
> > same
> > > > > logic, there was no crash, but no code ever ran such as
> initializing
> > > the
> > > > > context. Any illumination is most appreciated.
> > > > >
> > > > > Here's the stack trace followed by the offending bit code...
> > > > >
> > > > > [info] CdcProcessorSupplier::get
> > > > > [info] CdcProcessor::init - ctx:
> > > > >
> > >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552
> > > > > [info] CdcProcessor::close
> > > > > [error] Exception in thread "StreamThread-1"
> > > > java.lang.ClassCastException:
> > > > > org.apache.avro.generic.GenericData$Record cannot be cast to
> > > > > org.apache.avro.specific.SpecificRecord
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334)
> > > > > [error] at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
> > > > >
> > > > >   def main(args: Array[String]) {
> > > > >     val schemaRegistry = new CachedSchemaRegistryClient("
> > > > > http://localhost:8081";,
> > > > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
> > > > >     val builder: KStreamBuilder = new KStreamBuilder
> > > > >     val streamingConfig = {
> > > > >       val settings = new Properties
> > > > >       settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
> > > > >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > > > "localhost:9092")
> > > > >       settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > > > > "localhost:2181")
> > > > >       settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > > > > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> > > > >       settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> > > > >       settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > > > > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> > > > >       settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> > > > >       settings
> > > > >     }
> > > > >
> > > > >     import KeyValueImplicits._
> > > > >
> > > > >     val stringSerializer = new StringSerializer
> > > > >     val stringDeserializer = new StringDeserializer
> > > > >     val bcAccessLogSerializer = new
> > > > > SpecificAvroSerializer[BcAccessLog](schemaRegistry)
> > > > >     val bcAccessLogDeserializer = new
> > > > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry)
> > > > >
> > > > >     val rawSourceStream: KStream[String, String] =
> > > > > builder.stream(stringDeserializer,
> > > > >       stringDeserializer,
> > > > >       "raw-accesslog-" + testRun)
> > > > >
> > > > >     val filteredStream: KStream[String, BcAccessLog] =
> > > > > rawSourceStream.mapValues(parseAsJson)
> > > > >       .filter(isCdcEvent)
> > > > >       .map((key, value) => (getAccessLogKey(value),
> > > > toBcAccessLog(value)))
> > > > >
> > > > >     filteredStream
> > > > >       .through("accesslog-" + testRun,
> > > > >                stringSerializer,
> > > > >                bcAccessLogSerializer,
> > > > >                stringDeserializer,
> > > > >                bcAccessLogDeserializer)
> > > > >       .mapValues(value => {
> > > > >                    println("filteredSourceStream value: " + value)
> > > > >                    value
> > > > >                  })
> > > > >       .process(new CdcProcessorSupplier)
> > > > >
> > > > >     val stream: KafkaStreams = new KafkaStreams(builder,
> > > streamingConfig)
> > > > >     println("\nstart filtering BcAccessLog test run: " + testRun +
> > > "\n")
> > > > >     stream.start()
> > > > >   }
> > > > >
> > > > > Regards,
> > > > > Fred Patton
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to