Lukasz, Thanks. I will test it later. Did you also actually test it?
On Mon, 13 May 2019 at 11:57 PM, Cwik <lc...@google.com> wrote: > Can you access schema registry from a DoFn? > It seemed to me that the issue is about setting up the JVM environment in > such a way where it works with schema registry. If you can't access it from > KafkaIO then I don't see how a different DoFn would be able to access it > from within the same JVM. > > *From: *Yohei Onishi <vivre...@gmail.com> > *Date: *Mon, May 13, 2019 at 2:41 AM > *To: * <user@beam.apache.org> > > I got this answer from GCP support. Lukasz and Vishwas, any comments. >> >> Hello, >> >> the Dataflow specialist has reached me back. I will now expose what they >>> have told me. The answer to your question is no, Apache Beam does not >>> support Schema Registry. However, they have told me that you could >>> implement the calls to Schema Registry by yourself as Beam only consumes >>> raw messages and it is user's responsibility to do whatever they need with >>> the data. This is based on our understanding of the case that you want to >>> publish messages to Kafka, and have DF consume those messages, parsing them >>> based on the schema from the registry. I hope this information can be >>> useful to you, let me know if I can be of further help. >>> >> >> Yohei Onishi >> >> >> On Thu, May 9, 2019 at 8:34 PM Yohei Onishi <vivre...@gmail.com> wrote: >> >>> Thanks, >>> >>> I explicitly set file location and password in JAVA_TOOL_OPTIONS and >>> Consumer Factory. >>> Consumer Factory downloads keyStore and trusStore from GCS to local as >>> follows. >>> >>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >>> >>> Then I got FileNotExecption like below. It seems Consumer Factory does >>> not work well. When I create temporary directory for those files >>> using Files.createTempDirectory(prefix), job deployment works but access to >>> Schema Registry. >>> >>> Caused by: java.security.PrivilegedActionException: >>> java.io.FileNotFoundException: >>> /tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at >>> sun.security.ssl.SSLContextImpl$DefaultManagersHolder.getKeyManagers(SSLContextImpl.java:928) >>> at >>> sun.security.ssl.SSLContextImpl$DefaultManagersHolder.<clinit>(SSLContextImpl.java:864) >>> at >>> sun.security.ssl.SSLContextImpl$DefaultSSLContext.<init>(SSLContextImpl.java:1019) >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> at >>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>> at >>> java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>> at java.security.Provider$Service.newInstance(Provider.java:1595) >>> ... 42 more >>> Caused by: java.io.FileNotFoundException: >>> /tmp/keyStore/rfid-store-siv-data-consumer.jks (No such file or directory) >>> at java.io.FileInputStream.open0(Native Method) >>> at java.io.FileInputStream.open(FileInputStream.java:195) >>> at java.io.FileInputStream.<init>(FileInputStream.java:138) >>> at java.io.FileInputStream.<init>(FileInputStream.java:93) >>> at >>> sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:932) >>> at >>> sun.security.ssl.SSLContextImpl$DefaultManagersHolder$2.run(SSLContextImpl.java:929) >>> >>> So I am also implementing a solution without Schema Registry. >>> >>> Yohei Onishi >>> >>> >>> On Thu, May 9, 2019 at 5:12 PM Vishwas Bm <bmvish...@gmail.com> wrote: >>> >>>> Hi Yohei, >>>> >>>> The configuration exposed as environment variable was picked up by the >>>> Beam java program. >>>> >>>> If you check the backtrace of the error, it has method call to >>>> confluent schemaRegistry during deserialization of records >>>> >>>> >>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>>> >>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>>> >>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>>> >>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>>> >>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>>> >>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>>> >>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>>> >>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>>> >>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>>> >>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>>> >>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>>> >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>>> >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>>> >>>> >>>> >>>> *Thanks & Regards,* >>>> >>>> *Vishwas * >>>> >>>> >>>> On Thu, May 9, 2019 at 12:32 PM Yohei Onishi <vivre...@gmail.com> >>>> wrote: >>>> >>>>> Thanks. Did you use this configurations in your Apache Beam >>>>> application? >>>>> Does Apache Beam use Confluent Schema Registry client internally ? >>>>> >>>>> Yohei Onishi >>>>> >>>>> >>>>> On Thu, May 9, 2019 at 1:12 PM Vishwas Bm <bmvish...@gmail.com> wrote: >>>>> >>>>>> Hi Yohei, >>>>>> >>>>>> I had tried some time back with direct-runner and faced the same >>>>>> issue as in >>>>>> https://github.com/confluentinc/schema-registry/issues/943 when >>>>>> interacting with TLS enabled Kafka and SchemaRegistry. >>>>>> >>>>>> So I had set the environment variable JAVA_TOOL_OPTIONS with the >>>>>> required properties and then it worked. >>>>>> >>>>>> export >>>>>> JAVA_TOOL_OPTIONS="-Djavax.net.ssl.trustStore=client-truststore.jks >>>>>> -Djavax.net.ssl.trustStorePassword="client-password" >>>>>> -Djavax.net.ssl.keyStore=client-keystore.jks >>>>>> -Djavax.net.ssl.keyStorePassword=client-password" >>>>>> >>>>>> >>>>>> *Thanks & Regards,* >>>>>> >>>>>> *Vishwas * >>>>>> >>>>>> >>>>>> On Thu, May 9, 2019 at 5:13 AM Yohei Onishi <vivre...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks but I already did it as mentioned. >>>>>>> It seems that Apache Beam does not support Schema Registry. I will >>>>>>> post about it after I confirmed. >>>>>>> >>>>>>> On Thu, 9 May 2019 at 12:52 AM, Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I replied to the SO question with more details. >>>>>>>> >>>>>>>> The issue is that your trying to load a truststore (file) on the VM >>>>>>>> which doesn't exist. You need to make that file accessible in some >>>>>>>> way. The >>>>>>>> other SO question ( >>>>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1) >>>>>>>> has a code snippet where the user copies the truststore from GCS to a >>>>>>>> local >>>>>>>> tmp file path and the configures the map with that temp file path. >>>>>>>> >>>>>>>> On Wed, May 8, 2019 at 1:47 AM Yohei Onishi <vivre...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Can anyone teach me how Schema Registry client in Apache Beam is >>>>>>>>> configured? >>>>>>>>> I tried to find out it on the Github repo but I was not able to >>>>>>>>> find it. https://github.com/apache/beam >>>>>>>>> >>>>>>>>> I want to make sure if Apache Beam support Schema Registry and >>>>>>>>> does not have the same issue Confluent Schema Registry has. >>>>>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>>>>> >>>>>>>>> Yohei Onishi >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, May 8, 2019 at 3:27 PM Yohei Onishi <vivre...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I am developing a GCP Cloud Dataflow job that use Kafka broker >>>>>>>>>> and Schema Registry. Our Kafka broker and Schema Registry requires >>>>>>>>>> TLS >>>>>>>>>> client certificate. And I am facing connection issue with Schema >>>>>>>>>> Registry >>>>>>>>>> on deployment. Any suggestion is highly welcomed. >>>>>>>>>> >>>>>>>>>> Here is what I do for the Dataflow job. I create Consumer >>>>>>>>>> Properties for TLS configurations. >>>>>>>>>> >>>>>>>>>> props.put("security.protocol", "SSL"); >>>>>>>>>>> props.put("ssl.truststore.password", "aaa"); >>>>>>>>>>> props.put("ssl.keystore.password", "bbb"); >>>>>>>>>>> props.put("ssl.key.password", "ccc")); >>>>>>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>>>>>> props.put("specific.avro.reader", true); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> And update Consumer Properties by updateConsumerProperties. >>>>>>>>>> >>>>>>>>>> Pipeline p = Pipeline.create(options) >>>>>>>>>>> ... >>>>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> As this stackoverflow answer suggests, I also download keyStore >>>>>>>>>> and trustStore to local directory and specify trustStore / keyStore >>>>>>>>>> location on ConsumerProperties in ConsumerFactory. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://stackoverflow.com/questions/42726011/truststore-and-google-cloud-dataflow?noredirect=1&lq=1 >>>>>>>>>> >>>>>>>>>> Pipeline p = Pipeline.create(options) >>>>>>>>>>> ... >>>>>>>>>>> .withConsumerFactoryFn(new MyConsumerFactory(...)) >>>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> In ConsumerFactory: >>>>>>>>>> >>>>>>>>>> public Consumer<byte[], byte[]> apply(Map<String, Object> >>>>>>>>>>> config) { >>>>>>>>>>> // download keyStore and trustStore from GCS bucket >>>>>>>>>>> config.put("ssl.truststore.location", >>>>>>>>>>> (Object)localTrustStoreFilePath) >>>>>>>>>>> config.put("ssl.keystore.location", >>>>>>>>>>> (Object)localKeyStoreFilePath) >>>>>>>>>>> new KafkaConsumer<byte[], byte[]>(config); >>>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> With this code I succeeded in deployment but the Dataflow job got >>>>>>>>>> TLS server certificate verification error. >>>>>>>>>> >>>>>>>>>> Caused by: sun.security.validator.ValidatorException: PKIX path >>>>>>>>>>> building failed: >>>>>>>>>>> sun.security.provider.certpath.SunCertPathBuilderException: unable >>>>>>>>>>> to find >>>>>>>>>>> valid certification path to requested target >>>>>>>>>>> >>>>>>>>>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387) >>>>>>>>>>> >>>>>>>>>>> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292) >>>>>>>>>>> >>>>>>>>>>> sun.security.validator.Validator.validate(Validator.java:260) >>>>>>>>>>> >>>>>>>>>>> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) >>>>>>>>>>> >>>>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) >>>>>>>>>>> >>>>>>>>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) >>>>>>>>>>> >>>>>>>>>>> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) >>>>>>>>>>> >>>>>>>>>>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) >>>>>>>>>>> >>>>>>>>>>> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) >>>>>>>>>>> >>>>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14) >>>>>>>>>>> >>>>>>>>>>> org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) >>>>>>>>>>> >>>>>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>>>>>> java.lang.Thread.run(Thread.java:745) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Then I found that Schema Registry client load TLS configurations >>>>>>>>>> from system property. >>>>>>>>>> https://github.com/confluentinc/schema-registry/issues/943 >>>>>>>>>> >>>>>>>>>> I tested Kafka Consumer with the same configuration, and I >>>>>>>>>> confirmed it works fine. >>>>>>>>>> >>>>>>>>>> props.put("schema.registry.url", "https://host:port") >>>>>>>>>>> props.put("specific.avro.reader", true); >>>>>>>>>>> props.put("ssl.truststore.location", >>>>>>>>>>> System.getProperty("javax.net.ssl.trustStore")); >>>>>>>>>>> props.put("ssl.truststore.password", >>>>>>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>>>>>> props.put("ssl.keystore.location", >>>>>>>>>>> System.getProperty("javax.net.ssl.keyStore")); >>>>>>>>>>> props.put("ssl.keystore.password", >>>>>>>>>>> System.getProperty("javax.net.ssl.keyStorePassword")); >>>>>>>>>>> props.put("ssl.key.password", >>>>>>>>>>> System.getProperty("javax.net.ssl.key.password")); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Next I applied the same approach, which means apply the same TLS >>>>>>>>>> configurations to system properties and Consumer Properties, to >>>>>>>>>> Dataflow >>>>>>>>>> job code. >>>>>>>>>> >>>>>>>>>> I specified password by system properties when executing >>>>>>>>>> application. >>>>>>>>>> >>>>>>>>>> -Djavax.net.ssl.keyStorePassword=aaa \ >>>>>>>>>>> -Djavax.net.ssl.key.password=bbb \ >>>>>>>>>>> -Djavax.net.ssl.trustStorePassword=ccc \ >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Note: I set system property for trustStore and keyStore location >>>>>>>>>> in Consumer Factory since those files are downloaded to local temp >>>>>>>>>> directory. >>>>>>>>>> >>>>>>>>>> config.put("ssl.truststore.location", >>>>>>>>>>> (Object)localTrustStoreFilePath) >>>>>>>>>>> config.put("ssl.keystore.location", >>>>>>>>>>> (Object)localKeyStoreFilePath) >>>>>>>>>>> System.setProperty("javax.net.ssl.trustStore", >>>>>>>>>>> localTrustStoreFilePath) >>>>>>>>>>> System.setProperty("javax.net.ssl.keyStore", >>>>>>>>>>> localKeyStoreFilePath) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> but even deployment was failed with timeout error. >>>>>>>>>> >>>>>>>>>> Exception in thread "main" java.lang.RuntimeException: Failed to >>>>>>>>>>> construct instance from factory method >>>>>>>>>>> DataflowRunner#fromOptions(interface >>>>>>>>>>> org.apache.beam.sdk.options.PipelineOptions) >>>>>>>>>>> at >>>>>>>>>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>>>>> Method) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: DataflowRunner >>>>>>>>>>> requires gcpTempLocation, but failed to retrieve a value from >>>>>>>>>>> PipelineOptions >>>>>>>>>>> at >>>>>>>>>>> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246) >>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Error >>>>>>>>>>> constructing default value for gcpTempLocation: tempLocation is not >>>>>>>>>>> a valid >>>>>>>>>>> GCS path, >>>>>>>>>>> gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. >>>>>>>>>>> at >>>>>>>>>>> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to verify that GCS >>>>>>>>>>> bucket gs://dev-k8s-rfid-store-dataflow exists. >>>>>>>>>>> at >>>>>>>>>>> org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: java.io.IOException: Error getting access token for >>>>>>>>>>> service account: java.security.NoSuchAlgorithmException: Error >>>>>>>>>>> constructing >>>>>>>>>>> implementation (algorithm: Default, provider: SunJSSE, class: >>>>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>>>> at >>>>>>>>>>> com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: java.net.SocketException: >>>>>>>>>>> java.security.NoSuchAlgorithmException: Error constructing >>>>>>>>>>> implementation >>>>>>>>>>> (algorithm: Default, provider: SunJSSE, class: >>>>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>>>> at >>>>>>>>>>> javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: java.security.NoSuchAlgorithmException: Error >>>>>>>>>>> constructing implementation (algorithm: Default, provider: SunJSSE, >>>>>>>>>>> class: >>>>>>>>>>> sun.security.ssl.SSLContextImpl$DefaultSSLContext) >>>>>>>>>>> at >>>>>>>>>>> java.security.Provider$Service.newInstance(Provider.java:1617) >>>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Am I missing something? >>>>>>>>>> >>>>>>>>>> I posted the same question here >>>>>>>>>> https://stackoverflow.com/questions/56035121/how-to-configure-tls-connections-for-dataflow-job-that-use-kafka-and-schema-regi >>>>>>>>>> >>>>>>>>>> Thank you. >>>>>>>>>> Yohei Onishi >>>>>>>>>> >>>>>>>>> -- >>>>>>> Yohei Onishi >>>>>>> >>>>>> -- Yohei Onishi