Hello Thias,

thank you for your answer.

We've tested registering an existing (byte equal) schema a second time, but 
unfortunately the schema registry does still deny the request.

Your last suggestion sounds promising, but I think there are some edge cases 
with this approach that will still cause an error. For example when writing to 
a new topic that’s empty, querying this topic before won't return any records 
and therefore the schema would not be put into the schemaRegistryClient cache.

I'm still preferring a flag for the "avro-confluent-registry" format that will 
disable registering schemas and instead the format will just try to get the ID 
for a schema string from the registry. If there is an ID for that schema, Flink 
will use it. If there is none, an exception should be thrown.
What do you think of that?

Best regards,
Jannik


Von: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Gesendet: Mittwoch, 31. Mai 2023 13:33
An: Schmeier, Jannik <j.schme...@fraport.de>; user@flink.apache.org
Betreff: RE: Using pre-registered schemas with avro-confluent-registry format 
is not possible


Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the 
schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, 
otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry 
does not fail because it is actually not written
     *   Could be that this is not entirely true as we had to replace the whole 
schemaRegistryClient with our own implementation because the existing one could 
not be reconfigured to accept compressed answers from our r/o proxy
  *   if you manage to fill the cache of your schemaRegistryClient with the 
exact schema (e.g. by querying it beforehand) you might never run into the 
trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <j.schme...@fraport.de<mailto:j.schme...@fraport.de>>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Using pre-registered schemas with avro-confluent-registry format is 
not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud 
Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all 
Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error 
when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
 ~[?:?]
                at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
 ~[?:?]
                at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
 ~[?:?]
                at 
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) 
~[?:?]
                at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at 
StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) 
~[?:?]
                at 
StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) 
~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228)
 ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
~[flink-dist-1.17.0.jar:1.17.0]
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize 
schema registry.
                at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
 ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
 ~[?:?]
                at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
 ~[?:?]
                ... 44 more
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
User is denied operation Write on Subject: my-topic-key; error code: 40301
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259)
 ~[?:?]
                at 
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
 ~[?:?]
                at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
 ~[?:?]
                at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
 ~[?:?]
                at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
 ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like 
there is now way to disable this behavior. The format will always try to 
register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
Is there a particular reason for this or would you be interested in adding a 
configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to