Hi Somnath,

The problem here seems to be that you have */tmp/trust.jks* available when
creating the pipeline (in the launcher), but it apparently is not available
in the worker VMs (SDK container).

In Java we've been using JvmInitializers for Templates to copy files from
GCS when the worker starts up (see CommonTemplateJvmInitializer
<https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/common/src/main/java/com/google/cloud/teleport/v2/common/CommonTemplateJvmInitializer.java#L75-L81>).
I don't know if there's a corresponding simple way to achieve that
in Python.
More specifically for Kafka, we can use a function to build the
KafkaConsumer (
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withConsumerFactoryFn-org.apache.beam.sdk.transforms.SerializableFunction-)
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withConsumerFactoryFn-org.apache.beam.sdk.transforms.SerializableFunction->--
which appears to not be available in Python's interface as well.

Curious how others are solving this issue.
(It would be awesome if we could handle GCS references transparently when
using the IO, to make this much simpler)


Best,
Bruno




On Thu, Jul 20, 2023 at 12:44 PM Somnath Chouwdhury <
somnat...@datametica.com> wrote:

> Hi,
>
> We are trying to store the truststore.jks file inside the Flex Template
> Docker but while using it in the pipeline we are unable to locate it.
>
> we tried pulling the image and we can see the file is present in the
> docker at \tmp\trust.jks but while using the same we get the following
> error.
>
>
> *Error Logs:*
> Error message from worker: generic::unknown:
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
>
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct
> kafka consumer
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:67)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:134)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
>
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
> /tmp/trust.jks of type JKS
>
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
>
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
>
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
> ... 42 more
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
> /tmp/trust.jks of type JKS
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
>
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
>
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:154)
> ... 46 more
> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
> keystore /tmp/trust.jks of type JKS
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:155)
> ... 49 more
> Caused by: java.nio.file.NoSuchFileException: /tmp/trust.jks
>
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
>
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
>
> java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
> java.base/java.nio.file.Files.newByteChannel(Files.java:371)
> java.base/java.nio.file.Files.newByteChannel(Files.java:422)
>
> java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
> java.base/java.nio.file.Files.newInputStream(Files.java:156)
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
> ... 50 more
>
> *Here is the docker image that we used*
>
> FROM gcr.io/dataflow-templates-base/python38-template-launcher-base
> RUN apt-get update && \
>     apt-get install -y openjdk-11-jdk
>
> ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64
>
>
> ARG WORKDIR=/dataflow/python/using_flex_template_adv3
> RUN mkdir -p ${WORKDIR}
> WORKDIR ${WORKDIR}
>
> COPY . .
> COPY trust.jks /tmp/trust.jks
>
> ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
> ENV PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
> ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
> RUN apt-get update \
>     && apt-get install -y libffi-dev git \
>     && rm -rf /var/lib/apt/lists/* \
>     # Upgrade pip and install the requirements.
>     && pip install --no-cache-dir --upgrade pip \
>     && pip install --no-cache-dir -r $PYTHON_REQUIREMENTS_FILE \
>     # Download the requirements to speed up launching the Dataflow job.
>     && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache
> -r $PYTHON_REQUIREMENTS_FILE
>
> RUN pip install --no-cache-dir apache-beam[gcp]==2.43.0
> ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
>
>
>
> *Code Used:*
>
> read = p_input | "Read Json from Kafka" >> ReadFromKafka(
>     consumer_config=self.consumer_config,
>     topics=[self.topic],
>     commit_offset_in_finalize=False,
>     with_metadata=False
> )
>
>
> *consumer_config contents:*
>
> bootstrap.servers: bs:9095
> group.id: "Group1"
> ssl.truststore.location: '/tmp/trust.jks'
> sasl.mechanism: SCRAM-SHA-512
> security.protocol: SASL_SSL
> sasl.jaas.config: ""
>
>
>
> We are trying to run this pipeline on GCP Dataflow with apache beam python
> sdk.
>
>
>
> --
> Thanks and Regards,
>
> *Somnath Chouwdhury*Associate Engineer IV - Solutioning & Software
> Engineering
> +91 9284112897 <+91%2092841%2012897> | www.datametica.com
> Datametica Solutions Private Limited, Pune - India
>
>
> <https://www.linkedin.com/company/datametica>
> <https://twitter.com/DataMetica>
> <https://instagram.com/datametica_lifeatdm?utm_medium=copy_link>
> <https://www.facebook.com/datametica1>
> <https://www.youtube.com/channel/UCTBR2-f1mDSSpwD0BteETPQ>
>
>

Reply via email to