Hi,
I have a PyFlink job that needs to read from a Kafka topic and the
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just
Python.
from confluent_kafka import Consumer, KafkaException, KafkaError
def get_config(bootstrap_servers, ca_file, cert_file, key_file):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SSL',
'ssl.ca.location': ca_file,
'ssl.certificate.location': cert_file,
'ssl.key.location': key_file,
'ssl.endpoint.identification.algorithm': 'none',
'enable.ssl.certificate.verification': 'false',
'group.id': ‘my_group_id'
}
return config
And have read messages from the Kafka topic.
I am trying to set up something similar with Flink SQL:
t_env.execute_sql(f"""
CREATE TABLE logs (
`user` ROW(`user_id` BIGINT),
`timestamp` ROW(`secs` BIGINT)
) WITH (
'connector' = '{CONNECTOR_TYPE}',
'topic' = ‘{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id' = '{CONSUMER_GROUP}',
'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
'format' = '{MESSAGE_FORMAT}',
'properties.security.protocol' = 'SSL',
'properties.ssl.ca.location' = '{ca_file}',
'properties.ssl.certificate.location' = '{cert_file}',
'properties.ssl.key.location' = '{key_file}',
'properties.ssl.endpoint.identification.algorithm' = ''
)
""")
But when this runs I am getting this error:
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator
cbc357ccb763df2852fee8c4fc7d55f2).
...
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list
subscribed topic partitions due to
at
...
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs].
at
...
... 3 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
SSL handshake failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target
...
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
at java.lang.Thread.run(Thread.java:750)
Caused by: sun.security.validator.ValidatorException: PKIX path building
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278)
at
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at
sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable
to find valid certification path to requested target
at
sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
at
sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)
Any idea how this is usually configured in PyFlink? I am running this on EMR if
it matters.
Thanks
Kind regards
Phil