I am using the class io.confluent.kafka.serializers.KafkaAvroSerializer as
one of the base abstractions for Avro serialization. From the stack trace I
see that the instantiation of this class needs set up of
KafkaAvroSerializerConfig which needs a value for the schema registry url ..

regards.

On Tue, Jul 18, 2017 at 12:02 AM, Richard L. Burton III <mrbur...@gmail.com>
wrote:

> For your first question, no you can use the avro API.
>
>
>
> On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Hi -
>>
>> I am using Avro Serialization in a Kafka Streams application through the
>> following dependency ..
>>
>> "io.confluent"  % "kafka-avro-serializer" % "3.2.2"
>>
>> My question is : Is schema registry mandatory for using Avro Serialization
>> ? Because when I run the application I get the following exception where
>> it
>> complains that there is no default value for "schema.registry.url". My
>> current settings for StreamsConfig are the following ..
>>
>>       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> "kstream-log-processing-avro")
>>    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
>>    settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> Serdes.ByteArray.getClass.getName)
>>    settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> classOf[SpecificAvroSerde[LogRecordAvro]])
>>
>> .. and the exception ..
>>
>> 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator -
>> User provided listener
>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>> kstream-log-processing-avro failed on partition assignment
>> org.apache.kafka.streams.errors.StreamsException: Failed to configure
>> value
>> serde class com.lightbend.fdp.sample.kstream.serializers.
>> SpecificAvroSerde
>> at org.apache.kafka.streams.StreamsConfig.valueSerde(
>> StreamsConfig.java:594)
>> at
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<
>> init>(AbstractProcessorContext.java:58)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(
>> ProcessorContextImpl.java:41)
>> at
>> org.apache.kafka.streams.processor.internals.
>> StreamTask.<init>(StreamTask.java:137)
>> at
>> org.apache.kafka.streams.processor.internals.
>> StreamThread.createStreamTask(StreamThread.java:864)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
>> createTask(StreamThread.java:1237)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$
>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(
>> StreamThread.java:967)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.access$600(
>> StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:234)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> onJoinComplete(ConsumerCoordinator.java:259)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:352)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>> at
>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>> Caused by: io.confluent.common.config.ConfigException: Missing required
>> configuration "schema.registry.url" which has no default value.
>> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
>> at io.confluent.common.config.AbstractConfig.<init>(
>> AbstractConfig.java:76)
>> at
>> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(
>> AbstractKafkaAvroSerDeConfig.java:51)
>> at
>> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(
>> KafkaAvroSerializerConfig.java:33)
>> at
>> io.confluent.kafka.serializers.KafkaAvroSerializer.configure(
>> KafkaAvroSerializer.java:49)
>> at
>> com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.
>> configure(SpecificAvroSerializer.scala:21)
>> at
>> com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde.configure(
>> SpecificAvroSerde.scala:18)
>> at org.apache.kafka.streams.StreamsConfig.valueSerde(
>> StreamsConfig.java:591)
>> ... 17 common frames omitted
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to