Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud 
Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all 
Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error 
when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
 ~[?:?]
                at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
 ~[?:?]
                at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
 ~[?:?]
                at 
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) 
~[?:?]
                at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at 
StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) 
~[?:?]
                at 
StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) 
~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize 
schema registry.
                at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
 ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
 ~[?:?]
                at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
 ~[?:?]
                ... 44 more
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
User is denied operation Write on Subject: my-topic-key; error code: 40301
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
 ~[?:?]
                at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
 ~[?:?]
                at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
 ~[?:?]
                ... 44 more


I've inspected the code of the avro-confluent-registry format and it seems like 
there is now way to disable this behavior. The format will always try to 
register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85

Is there a particular reason for this or would you be interested in adding a 
configuration option to disable this behavior?

Best regards,
Jannik

Reply via email to