Thanks but I already did it as mentioned.
It seems that Apache Beam does not support Schema Registry. I will post
about it after I confirmed.

On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <lc...@google.com> wrote:

> I replied to the SO question with more details.
>
> The issue is that your trying to load a truststore (file) on the VM which
> doesn't exist. You need to make that file accessible in some way. The other
> SO question (
> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1)
> has a code snippet where the user copies the truststore from GCS to a local
> tmp file path and the configures the map with that temp file path.
>
> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <vivre...@gmail.com> wrote:
>
>> Can anyone teach me how Schema Registry client in Apache Beam is
>> configured?
>> I tried to find out it on the Github repo but I was not able to find it.
>> https://github.com/apache/beam
>>
>> I want to make sure if Apache Beam support Schema Registry and does not
>> have the same issue Confluent Schema Registry has.
>> https://github.com/confluentinc/schema-registry/issues/943
>>
>> Yohei Onishi
>>
>>
>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <vivre...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am developing a GCP Cloud Dataflow job that use Kafka broker and
>>> Schema Registry. Our Kafka broker and Schema Registry requires TLS client
>>> certificate. And I am facing connection issue with Schema Registry on
>>> deployment. Any suggestion is highly welcomed.
>>>
>>> Here is what I do for the Dataflow job. I create Consumer Properties for
>>> TLS configurations.
>>>
>>> props.put("security.protocol", "SSL");
>>>> props.put("ssl.truststore.password", "aaa");
>>>> props.put("ssl.keystore.password", "bbb");
>>>> props.put("ssl.key.password", "ccc"));
>>>> props.put("schema.registry.url", "https://host:port";)
>>>> props.put("specific.avro.reader", true);
>>>
>>>
>>> And update Consumer Properties by updateConsumerProperties.
>>>
>>> Pipeline p = Pipeline.create(options)
>>>> ...
>>>> .updateConsumerProperties(properties)
>>>> ...
>>>
>>>
>>> As this stackoverflow answer suggests, I also download keyStore and
>>> trustStore to local directory and specify trustStore / keyStore location on
>>> ConsumerProperties in ConsumerFactory.
>>>
>>>
>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1
>>>
>>> Pipeline p = Pipeline.create(options)
>>>>  ...
>>>>  .withConsumerFactoryFn(new MyConsumerFactory(...))
>>>>  ...
>>>
>>>
>>> In ConsumerFactory:
>>>
>>> public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
>>>>   // download keyStore and trustStore from GCS bucket
>>>>   config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
>>>>   config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>>>>   new KafkaConsumer<byte[], byte[]>(config);
>>>> }
>>>
>>>
>>> With this code I succeeded in deployment but the Dataflow job got TLS
>>> server certificate verification error.
>>>
>>> Caused by: sun.security.validator.ValidatorException: PKIX path building
>>>> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
>>>> to find valid certification path to requested target
>>>>
>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>>>
>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
>>>>         sun.security.validator.Validator.validate(Validator.java:260)
>>>>
>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
>>>>
>>>>  
>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
>>>>
>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
>>>>
>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>>>>
>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>>>>
>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
>>>>
>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
>>>>
>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
>>>>
>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
>>>>
>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
>>>>
>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
>>>>
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
>>>>
>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> Then I found that Schema Registry client load TLS configurations from
>>> system property.
>>> https://github.com/confluentinc/schema-registry/issues/943
>>>
>>> I tested Kafka Consumer with the same configuration, and I confirmed it
>>> works fine.
>>>
>>> props.put("schema.registry.url", "https://host:port";)
>>>> props.put("specific.avro.reader", true);
>>>> props.put("ssl.truststore.location",
>>>> System.getProperty("javax.net.ssl.trustStore"));
>>>> props.put("ssl.truststore.password",
>>>> System.getProperty("javax.net.ssl.keyStore"));
>>>> props.put("ssl.keystore.location",
>>>> System.getProperty("javax.net.ssl.keyStore"));
>>>> props.put("ssl.keystore.password",
>>>> System.getProperty("javax.net.ssl.keyStorePassword"));
>>>> props.put("ssl.key.password",
>>>> System.getProperty("javax.net.ssl.key.password"));
>>>
>>>
>>> Next I applied the same approach, which means apply the same TLS
>>> configurations to system properties and Consumer Properties, to Dataflow
>>> job code.
>>>
>>> I specified password by system properties when executing application.
>>>
>>> -Djavax.net.ssl.keyStorePassword=aaa \
>>>> -Djavax.net.ssl.key.password=bbb \
>>>> -Djavax.net.ssl.trustStorePassword=ccc \
>>>
>>>
>>> Note: I set system property for trustStore and keyStore location in
>>> Consumer Factory since those files are downloaded to local temp directory.
>>>
>>> config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
>>>> config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
>>>> System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
>>>> System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)
>>>
>>>
>>> but even deployment was failed with timeout error.
>>>
>>> Exception in thread "main" java.lang.RuntimeException: Failed to
>>>> construct instance from factory method DataflowRunner#fromOptions(interface
>>>> org.apache.beam.sdk.options.PipelineOptions)
>>>>         at
>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>>>> ...
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> ...
>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner requires
>>>> gcpTempLocation, but failed to retrieve a value from PipelineOptions
>>>>         at
>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
>>>> Caused by: java.lang.IllegalArgumentException: Error constructing
>>>> default value for gcpTempLocation: tempLocation is not a valid GCS path,
>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp.
>>>>         at
>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
>>>> ...
>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket
>>>> gs://dev-k8s-rfid-store-dataflow exists.
>>>>         at
>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
>>>> ...
>>>> Caused by: java.io.IOException: Error getting access token for service
>>>> account: java.security.NoSuchAlgorithmException: Error constructing
>>>> implementation (algorithm: Default, provider: SunJSSE, class:
>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>         at
>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
>>>> ...
>>>> Caused by: java.net.SocketException:
>>>> java.security.NoSuchAlgorithmException: Error constructing implementation
>>>> (algorithm: Default, provider: SunJSSE, class:
>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>         at
>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
>>>> ...
>>>> Caused by: java.security.NoSuchAlgorithmException: Error constructing
>>>> implementation (algorithm: Default, provider: SunJSSE, class:
>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext)
>>>>         at
>>>> java.security.Provider$Service.newInstance(Provider.java:1617)
>>>> ...
>>>
>>>
>>> Am I missing something?
>>>
>>> I posted the same question here
>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi
>>>
>>> Thank you.
>>> Yohei Onishi
>>>
>> --
Yohei Onishi

Reply via email to