Hi Brain,
We’re using consumerFactoryFn that reads certs from GCP and copying those to
local FS on each Dataflow worker.
Exception raised after consumerFactoryFn when Kafka tries to read certs from
local fs using KeyStore.load(InputStream is, String pass).
This code we using in consumerFactoryFn to read from GCP and writing to local
fs :
try (ReadableByteChannel readerChannel =
FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) {
try (FileChannel writeChannel =
FileChannel.open(Paths.get(outputFilePath), options)) {
writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE);
}
}
Thank you,
Ilya
From: Brian Hulette <[email protected]>
Reply to: "[email protected]" <[email protected]>
Date: Wednesday, 26 May 2021, 21:32
To: dev <[email protected]>
Cc: Artur Khanin <[email protected]>
Subject: Re: KafkaIO SSL issue
I came across this relevant StackOverflow question:
https://stackoverflow.com/questions/7399154/pkcs12-derinputstream-getlength-exception
They say the error is from a call to `KeyStore.load(InputStream is, String
pass);` (consistent with your stacktrace), and can occur whenever there's an
issue with the InputStream passed to it. Who created the InputStream used in
this case, is it Kafka code, Beam code, or your consumerFactoryFn?
Brian
On Mon, May 24, 2021 at 4:01 AM Ilya Kozyrev
<[email protected]<mailto:[email protected]>> wrote:
Hi community,
We have an issue with KafkaIO in the case of using a secure connection SASL SSL
to the Confluent Kafka 5.5.1. When we trying to configure the Kafka consumer
using consumerFactoryFn, we have an irregular issue related to certificate
reads from the file system. Irregular means, that different Dataflow jobs with
the same parameters and certs might be failed and succeeded. Store cert types
for Keystore and Truststore are specified explicitly in consumer config. In our
case, it's JKS for both certs.
Stacktrase:
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/tmp/kafka.truststore.jks of type JKS
at
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:289)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:153)
... 23 more
Caused by: java.security.cert.CertificateException: Unable to initialize,
java.io.IOException: DerInputStream.getLength(): lengthTag=65, too big.
at sun.security.x509.X509CertImpl.<init>(X509CertImpl.java:198)
at
sun.security.provider.X509Factory.engineGenerateCertificate(X509Factory.java:102)
at
java.security.cert.CertificateFactory.generateCertificate(CertificateFactory.java:339)
at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:755)
at sun.security.provider.JavaKeyStore$JKS.engineLoad(JavaKeyStore.java:56)
at
sun.security.provider.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:224)
at
sun.security.provider.JavaKeyStore$DualFormatJKS.engineLoad(JavaKeyStore.java:70)
at java.security.KeyStore.load(KeyStore.java:1445)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:286)
... 24 more
/tmp/kafka.truststore.jks is a path that’s used in consumerFactoryFn to load
certs from GCP to the worker's local file system.
Does anyone have any ideas on how to fix this issue?
Thank you,
Ilya