Thanks, so much, Guozhang. It didn't work immediately when I tried end of
day, so I deferred til this morning. It seems I also needed to invoke
configure manually, or pass the props to the deserializer constructor. I am
providing the revised example in case it helps any one. Thank you Liquan
for pointing out the need for the cast, which was also the case.

Regards,
Fred

  def main(args: Array[String]) {

    val schemaRegistry = new CachedSchemaRegistryClient("
http://localhost:8081";,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = {

      val settings = new Properties

      settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")

      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

      settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")

      settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081";);

      settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer")

      settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer")

      settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer")

      settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer")

      settings.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
"true")

      settings

    }


    import KeyValueImplicits._


    val stringSerializer = new StringSerializer

    val stringDeserializer = new StringDeserializer

    val bcAccessLogSerializer = new
SpecificAvroSerializer[BcAccessLog](schemaRegistry)

    val bcAccessLogDeserializer = new
SpecificAvroDeserializer[BcAccessLog](schemaRegistry)



bcAccessLogSerializer.configure(streamingConfig.asInstanceOf[java.util.Map[String,
String]], false)


bcAccessLogDeserializer.configure(streamingConfig.asInstanceOf[java.util.Map[String,
String]], false)


    val rawSourceStream: KStream[String, String] =
builder.stream(stringDeserializer,

      stringDeserializer,

      "raw-accesslog-" + testRun)


    val filteredStream: KStream[String, BcAccessLog] =
rawSourceStream.mapValues(parseAsJson)

      .filter(isCdcEvent)

      .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value)))


    filteredStream

      .through("accesslog-" + testRun,

        stringSerializer,

        bcAccessLogSerializer,

        stringDeserializer,

        bcAccessLogDeserializer)

      .mapValues(value => {

        println("filteredSourceStream value: " + value)

        value

      })

      .process(new CdcProcessorSupplier)


    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)

    println("\nstart filtering BcAccessLog test run: " + testRun + "\n")

    stream.start()

  }

On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Fred,
>
> Thanks for reporting this. I looked through your code and found this is due
> to a bug in io.confluent.examples.streams.utils.SpecificAvroDeserializer.
>
> I will fix this bug in Confluent's repo, as for you to work around it, you
> can create your own SpecificAvroDeserializer with the correct logic by
> copy-past Confluent class with the following one-line change:
>
> In configure(Map<String, ?> configs, boolean isKey) function, add the
> following key-value pair to the configs before calling inner.configure:
>
> configs.put("specific.avro.reader", "true");
>
>
> Guozhang
>
> On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com>
> wrote:
>
> > Before, switching the example from to() to through() things seemed to
> work
> > fine. Afterward, I get ClassCastExceptions from GenericRecord to
> > SpecificRecord. Hopefully, someone can point out something quick and dumb
> > so I can get my demo wrapped up. I never get past the through() method. I
> > am using the Kafka Streams tech preview, and using Avro compiled from
> > AvroTools. Additionally, initially I had a second parallel application
> > taking the output of this to() invocation and continuing with the same
> > logic, there was no crash, but no code ever ran such as initializing the
> > context. Any illumination is most appreciated.
> >
> > Here's the stack trace followed by the offending bit code...
> >
> > [info] CdcProcessorSupplier::get
> > [info] CdcProcessor::init - ctx:
> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552
> > [info] CdcProcessor::close
> > [error] Exception in thread "StreamThread-1"
> java.lang.ClassCastException:
> > org.apache.avro.generic.GenericData$Record cannot be cast to
> > org.apache.avro.specific.SpecificRecord
> > [error] at
> >
> >
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51)
> > [error] at
> >
> >
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24)
> > [error] at
> >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41)
> > [error] at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77)
> > [error] at
> >
> >
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113)
> > [error] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134)
> > [error] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334)
> > [error] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
> >
> >   def main(args: Array[String]) {
> >     val schemaRegistry = new CachedSchemaRegistryClient("
> > http://localhost:8081";,
> > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
> >     val builder: KStreamBuilder = new KStreamBuilder
> >     val streamingConfig = {
> >       val settings = new Properties
> >       settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
> >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092")
> >       settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181")
> >       settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> >       settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> >       settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > "io.confluent.kafka.serializers.KafkaAvroSerializer")
> >       settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> >       settings
> >     }
> >
> >     import KeyValueImplicits._
> >
> >     val stringSerializer = new StringSerializer
> >     val stringDeserializer = new StringDeserializer
> >     val bcAccessLogSerializer = new
> > SpecificAvroSerializer[BcAccessLog](schemaRegistry)
> >     val bcAccessLogDeserializer = new
> > SpecificAvroDeserializer[BcAccessLog](schemaRegistry)
> >
> >     val rawSourceStream: KStream[String, String] =
> > builder.stream(stringDeserializer,
> >       stringDeserializer,
> >       "raw-accesslog-" + testRun)
> >
> >     val filteredStream: KStream[String, BcAccessLog] =
> > rawSourceStream.mapValues(parseAsJson)
> >       .filter(isCdcEvent)
> >       .map((key, value) => (getAccessLogKey(value),
> toBcAccessLog(value)))
> >
> >     filteredStream
> >       .through("accesslog-" + testRun,
> >                stringSerializer,
> >                bcAccessLogSerializer,
> >                stringDeserializer,
> >                bcAccessLogDeserializer)
> >       .mapValues(value => {
> >                    println("filteredSourceStream value: " + value)
> >                    value
> >                  })
> >       .process(new CdcProcessorSupplier)
> >
> >     val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
> >     println("\nstart filtering BcAccessLog test run: " + testRun + "\n")
> >     stream.start()
> >   }
> >
> > Regards,
> > Fred Patton
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to