Hi,

I am developing a GCP Cloud Dataflow job that use Kafka broker and Schema
Registry. Our Kafka broker and Schema Registry requires TLS client
certificate. And I am facing connection issue with Schema Registry on
deployment. Any suggestion is highly welcomed.

Here is what I do for the Dataflow job. I create Consumer Properties for
TLS configurations.

props.put("security.protocol", "SSL");
> props.put("ssl.truststore.password", "aaa");
> props.put("ssl.keystore.password", "bbb");
> props.put("ssl.key.password", "ccc"));
> props.put("schema.registry.url", "https://host:port";)
> props.put("specific.avro.reader", true);


And update Consumer Properties by updateConsumerProperties.

Pipeline p = Pipeline.create(options)
> ...
> .updateConsumerProperties(properties)
> ...


As this stackoverflow answer suggests, I also download keyStore and
trustStore to local directory and specify trustStore / keyStore location on
ConsumerProperties in ConsumerFactory.

https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1

Pipeline p = Pipeline.create(options)
>  ...
>  .withConsumerFactoryFn(new MyConsumerFactory(...))
>  ...


In ConsumerFactory:

public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
>   // download keyStore and trustStore from GCS bucket
>   config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
>   config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>   new KafkaConsumer<byte[], byte[]>(config);
> }


With this code I succeeded in deployment but the Dataflow job got TLS
server certificate verification error.

Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
>
> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>
> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
>         sun.security.validator.Validator.validate(Validator.java:260)
>
> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
>
>  
> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
>
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
>
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>
> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>
> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
>
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
>
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
>
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
>
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
>
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
>
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
>
> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
>
> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)


Then I found that Schema Registry client load TLS configurations from
system property. https://github.com/confluentinc/schema-registry/issues/943

I tested Kafka Consumer with the same configuration, and I confirmed it
works fine.

props.put("schema.registry.url", "https://host:port";)
> props.put("specific.avro.reader", true);
> props.put("ssl.truststore.location",
> System.getProperty("javax.net.ssl.trustStore"));
> props.put("ssl.truststore.password",
> System.getProperty("javax.net.ssl.keyStore"));
> props.put("ssl.keystore.location",
> System.getProperty("javax.net.ssl.keyStore"));
> props.put("ssl.keystore.password",
> System.getProperty("javax.net.ssl.keyStorePassword"));
> props.put("ssl.key.password",
> System.getProperty("javax.net.ssl.key.password"));


Next I applied the same approach, which means apply the same TLS
configurations to system properties and Consumer Properties, to Dataflow
job code.

I specified password by system properties when executing application.

-Djavax.net.ssl.keyStorePassword=aaa \
> -Djavax.net.ssl.key.password=bbb \
> -Djavax.net.ssl.trustStorePassword=ccc \


Note: I set system property for trustStore and keyStore location in
Consumer Factory since those files are downloaded to local temp directory.

config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
> System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)


but even deployment was failed with timeout error.

Exception in thread "main" java.lang.RuntimeException: Failed to construct
> instance from factory method DataflowRunner#fromOptions(interface
> org.apache.beam.sdk.options.PipelineOptions)
>         at
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
> ...
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ...
> Caused by: java.lang.IllegalArgumentException: DataflowRunner requires
> gcpTempLocation, but failed to retrieve a value from PipelineOptions
>         at
> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
> Caused by: java.lang.IllegalArgumentException: Error constructing default
> value for gcpTempLocation: tempLocation is not a valid GCS path,
> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp.
>         at
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
> ...
> Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket
> gs://dev-k8s-rfid-store-dataflow exists.
>         at
> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
> ...
> Caused by: java.io.IOException: Error getting access token for service
> account: java.security.NoSuchAlgorithmException: Error constructing
> implementation (algorithm: Default, provider: SunJSSE, class:
> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>         at
> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
> ...
> Caused by: java.net.SocketException:
> java.security.NoSuchAlgorithmException: Error constructing implementation
> (algorithm: Default, provider: SunJSSE, class:
> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>         at
> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
> ...
> Caused by: java.security.NoSuchAlgorithmException: Error constructing
> implementation (algorithm: Default, provider: SunJSSE, class:
> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>         at java.security.Provider$Service.newInstance(Provider.java:1617)
> ...


Am I missing something?

I posted the same question here
https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi

Thank you.
Yohei Onishi

Reply via email to