[ 
https://issues.apache.org/jira/browse/FLINK-39674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39674:
-----------------------------------
    Labels: avro kafka oauth oauth2 pull-request-available  (was: avro kafka 
oauth oauth2)

> flink-avro-confluent-registry uses an obsolete schema registry client
> ---------------------------------------------------------------------
>
>                 Key: FLINK-39674
>                 URL: https://issues.apache.org/jira/browse/FLINK-39674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>    Affects Versions: 2.2.0, 2.2.1
>         Environment: this issue is environment-independent
>            Reporter: Alex Rovner
>            Priority: Major
>              Labels: avro, kafka, oauth, oauth2, pull-request-available
>
> For more than two years now, `flink-avro-confluent-registry` module uses 
> version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply 
> problematic for multiple reasons:
>  * Confluent's official support for this version expired in August 2025
>  * This version has several known high-severity vulnerabilities
>  * This version is not aligned with the Kafka client used in the most recent 
> version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The 
> included Kafka client has version 3.9.x, while the 7.5.x versions of the 
> schema registry client are made for Kafka version 3.5.x (see [compatibility 
> table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])
> The last point is especially painful, because the schema registry client 
> depends on the Kafka client for some of its functionalities. As the versions 
> of the two clients drift apart, we begin seeing runtime errors due to methods 
> not existing any more. For example, it is no longer possible to configure the 
> schema registry to use OAuth authentication for this reason:
>  
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 'void 
> org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
>  java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory, 
> java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
>  {code}
>  
> To trigger the above error, it is sufficient to configure a Kafka Sink with 
> the aforementioned most recent versions of `flink-avro-confluent-registry` 
> and `flink-connector-kafka`:
> {code:java}
> Map<String, String> registryConfig = Map.of(
>             "bearer.auth.credentials.source","OAUTHBEARER",
>             "bearer.auth.issuer.endpoint.url", "...",
>             "bearer.auth.client.id","registry-api",
>             "bearer.auth.client.secret","...",
>     );
> KafkaSink.<...>builder()
>         .setBootstrapServers("localhost:9092")
>         .setRecordSerializer(KafkaRecordSerializationSchema.<...>builder()
>                 
> .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(....class,
>  subject, schemaRegistryUrl, registryConfig))
>                 .setTopic(topicName)
>                 .setKeySerializationSchema(...)
>                 .build())
>         .build(); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to