[
https://issues.apache.org/jira/browse/FLINK-39674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39674:
-----------------------------------
Labels: avro kafka oauth oauth2 pull-request-available (was: avro kafka
oauth oauth2)
> flink-avro-confluent-registry uses an obsolete schema registry client
> ---------------------------------------------------------------------
>
> Key: FLINK-39674
> URL: https://issues.apache.org/jira/browse/FLINK-39674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC,
> SequenceFile)
> Affects Versions: 2.2.0, 2.2.1
> Environment: this issue is environment-independent
> Reporter: Alex Rovner
> Priority: Major
> Labels: avro, kafka, oauth, oauth2, pull-request-available
>
> For more than two years now, `flink-avro-confluent-registry` module uses
> version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply
> problematic for multiple reasons:
> * Confluent's official support for this version expired in August 2025
> * This version has several known high-severity vulnerabilities
> * This version is not aligned with the Kafka client used in the most recent
> version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The
> included Kafka client has version 3.9.x, while the 7.5.x versions of the
> schema registry client are made for Kafka version 3.5.x (see [compatibility
> table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])
> The last point is especially painful, because the schema registry client
> depends on the Kafka client for some of its functionalities. As the versions
> of the two clients drift apart, we begin seeing runtime errors due to methods
> not existing any more. For example, it is no longer possible to configure the
> schema registry to use OAuth authentication for this reason:
>
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 'void
> org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
> java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory,
> java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
> at
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
> {code}
>
> To trigger the above error, it is sufficient to configure a Kafka Sink with
> the aforementioned most recent versions of `flink-avro-confluent-registry`
> and `flink-connector-kafka`:
> {code:java}
> Map<String, String> registryConfig = Map.of(
> "bearer.auth.credentials.source","OAUTHBEARER",
> "bearer.auth.issuer.endpoint.url", "...",
> "bearer.auth.client.id","registry-api",
> "bearer.auth.client.secret","...",
> );
> KafkaSink.<...>builder()
> .setBootstrapServers("localhost:9092")
> .setRecordSerializer(KafkaRecordSerializationSchema.<...>builder()
>
> .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(....class,
> subject, schemaRegistryUrl, registryConfig))
> .setTopic(topicName)
> .setKeySerializationSchema(...)
> .build())
> .build(); {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)