Hi,

I have a PyFlink job that needs to read from a Kafka topic and the 
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just 
Python.

from confluent_kafka import Consumer, KafkaException, KafkaError



def get_config(bootstrap_servers, ca_file, cert_file, key_file):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SSL',
'ssl.ca.location': ca_file,
'ssl.certificate.location': cert_file,
'ssl.key.location': key_file,
'ssl.endpoint.identification.algorithm': 'none',
'enable.ssl.certificate.verification': 'false',
'group.id': ‘my_group_id'
}


return config



And have read messages from the Kafka topic.

I am trying to set up something similar with Flink SQL:

t_env.execute_sql(f"""
CREATE TABLE logs (
`user` ROW(`user_id` BIGINT),
`timestamp` ROW(`secs` BIGINT)
) WITH (
'connector' = '{CONNECTOR_TYPE}',
'topic' = ‘{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id' = '{CONSUMER_GROUP}',
'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
'format' = '{MESSAGE_FORMAT}',
'properties.security.protocol' = 'SSL',
'properties.ssl.ca.location' = '{ca_file}',
'properties.ssl.certificate.location' = '{cert_file}',
'properties.ssl.key.location' = '{key_file}',
'properties.ssl.endpoint.identification.algorithm' = ''
)
""")


But when this runs I am getting this error:

Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator 
cbc357ccb763df2852fee8c4fc7d55f2).
...
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list 
subscribed topic partitions due to
at 
...
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs].
at 
...
... 3 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target
...
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
at java.lang.Thread.run(Thread.java:750)
Caused by: sun.security.validator.ValidatorException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at 
sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable 
to find valid certification path to requested target
at 
sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
at 
sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)


Any idea how this is usually configured in PyFlink? I am running this on EMR if 
it matters.
Thanks

Kind regards
Phil











Reply via email to