For your first question, no you can use the avro API.

On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi -
>
> I am using Avro Serialization in a Kafka Streams application through the
> following dependency ..
>
> "io.confluent"  % "kafka-avro-serializer" % "3.2.2"
>
> My question is : Is schema registry mandatory for using Avro Serialization
> ? Because when I run the application I get the following exception where it
> complains that there is no default value for "schema.registry.url". My
> current settings for StreamsConfig are the following ..
>
>       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "kstream-log-processing-avro")
>    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
>    settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.ByteArray.getClass.getName)
>    settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> classOf[SpecificAvroSerde[LogRecordAvro]])
>
> .. and the exception ..
>
> 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator -
> User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> kstream-log-processing-avro failed on partition assignment
> org.apache.kafka.streams.errors.StreamsException: Failed to configure value
> serde class com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde
> at
> org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:594)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:58)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:137)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: io.confluent.common.config.ConfigException: Missing required
> configuration "schema.registry.url" which has no default value.
> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
> at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:76)
> at
>
> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:51)
> at
>
> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:33)
> at
>
> io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:49)
> at
>
> com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.configure(SpecificAvroSerializer.scala:21)
> at
>
> com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde.configure(SpecificAvroSerde.scala:18)
> at
> org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:591)
> ... 17 common frames omitted
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to