In short, Avro serializers/deserializers provided by Confluent always
integrate with (and thus require) Confluent Schema Registry.  That's why
you must set the `schema.registry.url` configuration for them.

If you want to use Avro but without a schema registry, you'd need to work
with the Avro API directly.  You can also implement your own "no schema
registry" Avro serializers/deserializers for more convenience, of course.

Best wishes,
Michael



On Mon, Jul 17, 2017 at 8:51 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> I am using the class io.confluent.kafka.serializers.KafkaAvroSerializer as
> one of the base abstractions for Avro serialization. From the stack trace I
> see that the instantiation of this class needs set up of
> KafkaAvroSerializerConfig which needs a value for the schema registry url
> ..
>
> regards.
>
> On Tue, Jul 18, 2017 at 12:02 AM, Richard L. Burton III <
> mrbur...@gmail.com>
> wrote:
>
> > For your first question, no you can use the avro API.
> >
> >
> >
> > On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh <ghosh.debas...@gmail.com
> >
> > wrote:
> >
> >> Hi -
> >>
> >> I am using Avro Serialization in a Kafka Streams application through the
> >> following dependency ..
> >>
> >> "io.confluent"  % "kafka-avro-serializer" % "3.2.2"
> >>
> >> My question is : Is schema registry mandatory for using Avro
> Serialization
> >> ? Because when I run the application I get the following exception where
> >> it
> >> complains that there is no default value for "schema.registry.url". My
> >> current settings for StreamsConfig are the following ..
> >>
> >>       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >> "kstream-log-processing-avro")
> >>    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
> >>    settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> Serdes.ByteArray.getClass.getName)
> >>    settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> classOf[SpecificAvroSerde[LogRecordAvro]])
> >>
> >> .. and the exception ..
> >>
> >> 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator
> -
> >> User provided listener
> >> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> >> kstream-log-processing-avro failed on partition assignment
> >> org.apache.kafka.streams.errors.StreamsException: Failed to configure
> >> value
> >> serde class com.lightbend.fdp.sample.kstream.serializers.
> >> SpecificAvroSerde
> >> at org.apache.kafka.streams.StreamsConfig.valueSerde(
> >> StreamsConfig.java:594)
> >> at
> >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<
> >> init>(AbstractProcessorContext.java:58)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.<init>(
> >> ProcessorContextImpl.java:41)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.<init>(StreamTask.java:137)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamThread.createStreamTask(StreamThread.java:864)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> >> createTask(StreamThread.java:1237)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$
> >> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(
> >> StreamThread.java:967)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> >> StreamThread.java:69)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:234)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> onJoinComplete(ConsumerCoordinator.java:259)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> joinGroupIfNeeded(AbstractCoordinator.java:352)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> ensureActiveGroup(AbstractCoordinator.java:303)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >> ConsumerCoordinator.java:290)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> >> pollOnce(KafkaConsumer.java:1029)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:995)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:592)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:361)
> >> Caused by: io.confluent.common.config.ConfigException: Missing required
> >> configuration "schema.registry.url" which has no default value.
> >> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
> >> at io.confluent.common.config.AbstractConfig.<init>(
> >> AbstractConfig.java:76)
> >> at
> >> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(
> >> AbstractKafkaAvroSerDeConfig.java:51)
> >> at
> >> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(
> >> KafkaAvroSerializerConfig.java:33)
> >> at
> >> io.confluent.kafka.serializers.KafkaAvroSerializer.configure(
> >> KafkaAvroSerializer.java:49)
> >> at
> >> com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.
> >> configure(SpecificAvroSerializer.scala:21)
> >> at
> >> com.lightbend.fdp.sample.kstream.serializers.
> SpecificAvroSerde.configure(
> >> SpecificAvroSerde.scala:18)
> >> at org.apache.kafka.streams.StreamsConfig.valueSerde(
> >> StreamsConfig.java:591)
> >> ... 17 common frames omitted
> >>
> >> regards.
> >>
> >> --
> >> Debasish Ghosh
> >> http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg
> >> Blog: http://debasishg.blogspot.com
> >> Code: http://github.com/debasishg
> >>
> >
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to