[ 
https://issues.apache.org/jira/browse/FLINK-39674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Rovner updated FLINK-39674:
--------------------------------
    Description: 
For more than two years now, `flink-avro-confluent-registry` module uses 
version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply 
problematic for multiple reasons:
 * Confluent's official support for this version expired in August 2025
 * This version has several known high-severity vulnerabilities
 * This version is not aligned with the Kafka client used in the most recent 
version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The 
included Kafka client has version 3.9.x, while the 7.5.x versions of the schema 
registry client are made for Kafka version 3.5.x (see [compatibility 
table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])

The last point is especially painful, because the schema registry client 
depends on the Kafka client for some of its functionalities. As the versions of 
the two clients drift apart, we begin seeing runtime errors due to methods not 
existing any more. For example, it is no longer possible to configure the 
schema registry to use OAuth authentication for this reason:

 
{code:java}
Caused by: java.lang.NoSuchMethodError: 'void 
org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
 java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory, 
java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
    at 
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
 {code}
 

To trigger the above error, it is sufficient to configure a Kafka Sink with the 
aforementioned most recent versions of `flink-avro-confluent-registry` and 
`flink-connector-kafka`:
{code:java}
Map<String, String> registryConfig = Map.of(
            "bearer.auth.credentials.source","OAUTHBEARER",
            "bearer.auth.issuer.endpoint.url", "...",
            "bearer.auth.client.id","registry-api",
            "bearer.auth.client.secret","...",
    );
KafkaSink.<...>builder()
        .setBootstrapServers("localhost:9092")
        .setRecordSerializer(KafkaRecordSerializationSchema.<...>builder()
                
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(....class,
 subject, schemaRegistryUrl, registryConfig))
                .setTopic(topicName)
                .setKeySerializationSchema(...)
                .build())
        .build(); {code}
 

  was:
For more than two years now, `flink-avro-confluent-registry` module uses 
version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply 
problematic for multiple reasons:
 * Confluent's official support for this version expired in August 2025
 * This version has several known high-severity vulnerabilities
 * This version is not aligned with the Kafka client used in the most recent 
version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The 
included Kafka client has version 3.9.x, while the 7.5.x versions of the schema 
registry client are made for Kafka version 3.5.x (see [compatibility 
table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])

The last point is especially painful, because the schema registry client 
depends on the Kafka client for some of its functionalities. As the versions of 
the two clients drift apart, we begin seeing runtime errors due to methods not 
existing any more. For example, it is no longer possible to configure the 
schema registry to use OAuth authentication for this reason:

 
{code:java}
Caused by: java.lang.NoSuchMethodError: 'void 
org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
 java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory, 
java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
    at 
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
 {code}
 

 


> flink-avro-confluent-registry uses an obsolete schema registry client
> ---------------------------------------------------------------------
>
>                 Key: FLINK-39674
>                 URL: https://issues.apache.org/jira/browse/FLINK-39674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>    Affects Versions: 2.2.0, 2.2.1
>         Environment: this issue is environment-independent
>            Reporter: Alex Rovner
>            Priority: Major
>              Labels: avro, kafka, oauth, oauth2
>
> For more than two years now, `flink-avro-confluent-registry` module uses 
> version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply 
> problematic for multiple reasons:
>  * Confluent's official support for this version expired in August 2025
>  * This version has several known high-severity vulnerabilities
>  * This version is not aligned with the Kafka client used in the most recent 
> version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The 
> included Kafka client has version 3.9.x, while the 7.5.x versions of the 
> schema registry client are made for Kafka version 3.5.x (see [compatibility 
> table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])
> The last point is especially painful, because the schema registry client 
> depends on the Kafka client for some of its functionalities. As the versions 
> of the two clients drift apart, we begin seeing runtime errors due to methods 
> not existing any more. For example, it is no longer possible to configure the 
> schema registry to use OAuth authentication for this reason:
>  
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 'void 
> org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
>  java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory, 
> java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
>  {code}
>  
> To trigger the above error, it is sufficient to configure a Kafka Sink with 
> the aforementioned most recent versions of `flink-avro-confluent-registry` 
> and `flink-connector-kafka`:
> {code:java}
> Map<String, String> registryConfig = Map.of(
>             "bearer.auth.credentials.source","OAUTHBEARER",
>             "bearer.auth.issuer.endpoint.url", "...",
>             "bearer.auth.client.id","registry-api",
>             "bearer.auth.client.secret","...",
>     );
> KafkaSink.<...>builder()
>         .setBootstrapServers("localhost:9092")
>         .setRecordSerializer(KafkaRecordSerializationSchema.<...>builder()
>                 
> .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(....class,
>  subject, schemaRegistryUrl, registryConfig))
>                 .setTopic(topicName)
>                 .setKeySerializationSchema(...)
>                 .build())
>         .build(); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to