Hi Phil,

The kafka configuration keys of ssl maybe not correct. You can refer the
kafka document[1] to get the ssl configurations of client.


[1] https://kafka.apache.org/documentation/#security_configclients


Best,
Zhongqiang Gong

Phil Stavridis <phi...@gmail.com> 于2024年5月17日周五 01:44写道:

> Hi,
>
> I have a PyFlink job that needs to read from a Kafka topic and the
> communication with the Kafka broker requires SSL.
> I have connected to the Kafka cluster with something like this using just
> Python.
>
> from confluent_kafka import Consumer, KafkaException, KafkaError
>
>
>
> def get_config(bootstrap_servers, ca_file, cert_file, key_file):
> config = {
> 'bootstrap.servers': bootstrap_servers,
> 'security.protocol': 'SSL',
> 'ssl.ca.location': ca_file,
> 'ssl.certificate.location': cert_file,
> 'ssl.key.location': key_file,
> 'ssl.endpoint.identification.algorithm': 'none',
> 'enable.ssl.certificate.verification': 'false',
> 'group.id': ‘my_group_id'
> }
>
>
> return config
>
>
>
> And have read messages from the Kafka topic.
>
> I am trying to set up something similar with Flink SQL:
>
> t_env.execute_sql(f"""
> CREATE TABLE logs (
> `user` ROW(`user_id` BIGINT),
> `timestamp` ROW(`secs` BIGINT)
> ) WITH (
> 'connector' = '{CONNECTOR_TYPE}',
> 'topic' = ‘{KAFKA_TOPIC}',
> 'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
> 'properties.group.id' = '{CONSUMER_GROUP}',
> 'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
> 'format' = '{MESSAGE_FORMAT}',
> 'properties.security.protocol' = 'SSL',
> 'properties.ssl.ca.location' = '{ca_file}',
> 'properties.ssl.certificate.location' = '{cert_file}',
> 'properties.ssl.key.location' = '{key_file}',
> 'properties.ssl.endpoint.identification.algorithm' = ''
> )
> """)
>
>
> But when this runs I am getting this error:
>
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered
> by OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]'
> (operator cbc357ccb763df2852fee8c4fc7d55f2).
> ...
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list
> subscribed topic partitions due to
> at
> ...
> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
> [logs].
> at
> ...
> ... 3 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
> SSL handshake failed
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at
> org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
> ... 10 more
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
> SSL handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> ...
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
> at
> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
> at sun.security.validator.Validator.validate(Validator.java:271)
> at
> sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
> at
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278)
> at
> sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
> at
> sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
> ... 19 more
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
> at
> sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
> at
> sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129)
> at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
> at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)
>
>
> Any idea how this is usually configured in PyFlink? I am running this on
> EMR if it matters.
> Thanks
>
> Kind regards
> Phil
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to