Hi team, we encountered the schema forbidden issue during deployment with
the changes of flink version upgrade (1.16.1 -> 1.17.1), and also
encountered incorrect reset kafka offset issue during restoring our flink
job from this change. Hope to get some help here.

*1 - schema forbidden issue*
In the changes of upgrade flink version from 1.16.1 to 1.17.1, we also
switched from our customized schema-registry dependency with kafka-clients
version of 6.2.2-ccs
to org.apache.flink:flink-avro-confluent-registry:1.17.1 . Previously we
never encountered this issue using our customized schema-registry, but
after the changes, the error in the job was like this:

97112e6_4e578e017158b6c826cf089a3d845171_10_5) switched from RUNNING
to FAILED on sessionizer-job-taskmanager-1-1 @ 192.168.71.13
(dataPort=40879).
java.io.IOException: Could not register schema in registry
    at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
~[?:?]
    at 
org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
~[?:?]
    ...
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Forbidden; error code: 403
    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:301)
~[?:?]
    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
~[?:?]
    ...
    at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
~[?:?]


And the job keeps switching from RUNNING to CANCELING to CANCELED.
We checked the schema of the kafka topic we are producing to, it's owned by
other teams, not us. That's why it said Forbidden.
We tried adding auto.register.schemas , and set it as false into our
schemaRegistryConfig, but not working. And we cannot change the owner or
add another owner on that schema.
With this limitation, is there any way to avoid this issue while switching
to 1.17.1 with the flink schemaRegistry dependency?

*2 - incorrect reset kafka offset issue*
We reverted the changes, and the job started running. But the kafka
consumer lag was 50M, which was so high and unexpected. We found that the
offset was reset automatically to 15 days ago, instead of the time when our
job was down.
We did not add
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
to
our kafkaSource, and just found that without specifying it, it will reset
to the earliest offset by default, instead of committed or latest offset,
this is the reason.
But what we are confused are that we never encountered this problems before
during any restoration, the consumer lag was not this high, and the kafka
offset seems reset correctly.
What could be the cause that the issue just appeared when restoring the job
from the changes of upgrading flink (1.16.1 -> 1.17.1) and removing
dependency from our customized schema-registry  with kafka-clients version
of 6.2.2-ccs ,  and switching to
org.apache.flink:flink-connector-kafka:1.17.1 , which has kafka-client
version 3.2.3 . Is it related to the kafka-client version change or there
are other reasons actually?Thank you!

Best,
Lijuan

Reply via email to