Hi Phil, The kafka configuration keys of ssl maybe not correct. You can refer the kafka document[1] to get the ssl configurations of client.
[1] https://kafka.apache.org/documentation/#security_configclients Best, Zhongqiang Gong Phil Stavridis <phi...@gmail.com> 于2024年5月17日周五 01:44写道: > Hi, > > I have a PyFlink job that needs to read from a Kafka topic and the > communication with the Kafka broker requires SSL. > I have connected to the Kafka cluster with something like this using just > Python. > > from confluent_kafka import Consumer, KafkaException, KafkaError > > > > def get_config(bootstrap_servers, ca_file, cert_file, key_file): > config = { > 'bootstrap.servers': bootstrap_servers, > 'security.protocol': 'SSL', > 'ssl.ca.location': ca_file, > 'ssl.certificate.location': cert_file, > 'ssl.key.location': key_file, > 'ssl.endpoint.identification.algorithm': 'none', > 'enable.ssl.certificate.verification': 'false', > 'group.id': ‘my_group_id' > } > > > return config > > > > And have read messages from the Kafka topic. > > I am trying to set up something similar with Flink SQL: > > t_env.execute_sql(f""" > CREATE TABLE logs ( > `user` ROW(`user_id` BIGINT), > `timestamp` ROW(`secs` BIGINT) > ) WITH ( > 'connector' = '{CONNECTOR_TYPE}', > 'topic' = ‘{KAFKA_TOPIC}', > 'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}', > 'properties.group.id' = '{CONSUMER_GROUP}', > 'scan.startup.mode' = '{STARTUP_MODE_LATEST}', > 'format' = '{MESSAGE_FORMAT}', > 'properties.security.protocol' = 'SSL', > 'properties.ssl.ca.location' = '{ca_file}', > 'properties.ssl.certificate.location' = '{cert_file}', > 'properties.ssl.key.location' = '{key_file}', > 'properties.ssl.endpoint.identification.algorithm' = '' > ) > """) > > > But when this runs I am getting this error: > > Caused by: org.apache.flink.util.FlinkException: Global failure triggered > by OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' > (operator cbc357ccb763df2852fee8c4fc7d55f2). > ... > Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list > subscribed topic partitions due to > at > ... > Caused by: java.lang.RuntimeException: Failed to get metadata for topics > [logs]. > at > ... > ... 3 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException: > SSL handshake failed > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) > ... 10 more > Caused by: > org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException: > SSL handshake failed > Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > ... > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at > org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413) > at > org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344) > at java.lang.Thread.run(Thread.java:750) > Caused by: sun.security.validator.ValidatorException: PKIX path building > failed: sun.security.provider.certpath.SunCertPathBuilderException: unable > to find valid certification path to requested target > at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456) > at > sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323) > at sun.security.validator.Validator.validate(Validator.java:271) > at > sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315) > at > sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278) > at > sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) > at > sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632) > ... 19 more > Caused by: sun.security.provider.certpath.SunCertPathBuilderException: > unable to find valid certification path to requested target > at > sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148) > at > sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129) > at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280) > at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451) > > > Any idea how this is usually configured in PyFlink? I am running this on > EMR if it matters. > Thanks > > Kind regards > Phil > > > > > > > > > > > >