Hi Cham, Thanks. I tried adding my jar to the classpath in the Java SDK container, but it made no difference.
I then saw that the loader program for the Java SDK harness hard-codes a few jars to be in the classpath [1]. It also tacks on any jars specified to be staged, but in any case, it does not seem to include every jar in /opt/apache/beam/jars directory. Regards, Deepak [1] https://github.com/apache/beam/blob/c1db5fe0afabc6639ea09ca36ef13ebb63b283de/sdks/java/container/boot.go#L135 On Tue, Mar 8, 2022 at 11:07 AM Chamikara Jayalath <chamik...@google.com> wrote: > > > > On Fri, Mar 4, 2022 at 3:16 PM Deepak Nagaraj <n.dee...@gmail.com> wrote: >> >> Hi Cham, >> >> Thanks, we set filesToStage to be empty within expansion service >> configuration. >> >> Now when we run a Python pipeline that reads from Kafka, we see an exception >> about a missing class. [1] >> >> This class is present in the jar that was getting staged earlier, so in a >> way the failure is expected. >> >> However, the same jar is also present within the classpath of the >> taskmanager [2], yet it does not seem to be considered by the function >> harness. > > > Hi Deepak, > > In general, when "filesToStage" is excluded, everything in the CLASSPATH > should be staged for the SDK Harness. I'm not too familiar with the Flink > dependency setup but is it possible that the relevant jar does not get staged > for the SDK harness somehow ? > > Thanks, > Cham > >> >> >> Is there a way we can let the function harness consider the available jar? >> >> Also, is our understanding correct that the harness only looks at jars >> previously uploaded to the staging service? >> >> Thanks, >> Deepak >> >> [1] Exception about missing class: >> >> 2022-03-03 17:37:29,531 ERROR >> org.apache.beam.fn.harness.control.BeamFnControlClient [] - >> Error thrown when handling InstructionRequest 1 >> java.lang.NoClassDefFoundError: >> org/springframework/expression/EvaluationContext >> at >> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toHeaders(KafkaRecordCoder.java:86) >> at >> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.decode(KafkaRecordCoder.java:81) >> at >> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.decode(KafkaRecordCoder.java:40) >> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >> at >> org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64) >> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:126) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:467) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> at java.base/java.lang.Thread.run(Thread.java:829) >> Caused by: java.lang.ClassNotFoundException: >> org.springframework.expression.EvaluationContext >> at >> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) >> at >> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >> ... 16 more >> >> [2] Classpath of task manager (the missing class is present in >> beam-runners-flink-job-server-no-slf4j.jar, which apparently is not >> considered by the harness): >> >> 2022-03-03 07:18:42,146 INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - >> Classpath: /opt/flink/lib/beam-runners-flink-job-server-no-slf4j.jar:/op >> t/flink/lib/flink-csv-1.13.5.jar:/opt/flink/lib/flink-json-1.13.5.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.5.jar:/ >> opt/flink/lib/flink-table_2.12-1.13.5.jar:/opt/flink/lib/log4j-1.2-api-2.16.0.jar:/opt/flink/lib/log4j-api-2.16.0.jar:/opt/flink/lib/log4j-core-2.16.0.jar:/opt/flink/lib >> /log4j-slf4j-impl-2.16.0.jar:/opt/flink/lib/flink-dist_2.12-1.13.5.jar::: >> >> On 2022/02/11 22:22:16 Chamikara Jayalath wrote: >> > Hi Jeremy, >> > >> > By default we stage all jars in the CLASSPATH of expansion service. You can >> > override this by setting the filesToStage option when starting up the >> > expansion service: >> > https://github.com/apache/beam/blob/7fa5387ffac4f2801077f2e55aa2eba7a47036d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java#L38 >> > >> > Thanks, >> > Cham >> > >> > On Fri, Feb 11, 2022 at 2:09 PM Jeremy Lewi <je...@primer.ai> wrote: >> > >> > > Hi Folks, >> > > >> > > So I think this is what's happening. My pipeline has multi-language >> > > transforms because its using Kafka IO. When the runner calls transform on >> > > those transforms it contacts the expansion service which responds back >> > > with >> > > a list of jars. The runner then downloads those jars. >> > > >> > > Then when the runner stages artifacts it ends up uploading those jars to >> > > the staging service. >> > > >> > > This seems unnecessary and inefficient. I was able to hack the >> > > portable_runner.py code to test this out. When I did my job submitted >> > > quite >> > > quickly (the pipeline's running but I haven't verified its working so its >> > > possible I broke something). >> > > >> > > Is this working as intended? Is there someway to avoid this without >> > > having >> > > to hack the runner code? >> > > >> > > Interestingly, it seems like downloading the jars is much faster than >> > > uploading them but I haven't investigated this. >> > > >> > > J >> > > >> > > On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <je...@primer.ai> wrote: >> > > >> > >> Hi Folks, >> > >> >> > >> I'm using a patched version of apache Beam 2.35 python and running on >> > >> Flink on Kubernetes using the PortableJobRunner. >> > >> >> > >> It looks like when submitting the job, the runner tries to upload a >> > >> large >> > >> 274 Mb flink job server jar to the staging service. >> > >> >> > >> This doesn't seem right. I already have an instance of the JobServer >> > >> running and my program is talking to the job server so why is it trying >> > >> to >> > >> stage the JobServer? >> > >> >> > >> I believe this problem started when I upgraded to 2.35. >> > >> >> > >> When I debugged this I found the runner was getting stuck in >> > >> offer_artifacts >> > >> >> > >> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235 >> > >> >> > >> When I looked at the rolePayload of the request in question it was >> > >> >> > >> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar >> > >> >> > >> How does the runner decide which artifacts to upload? Could this be >> > >> caused by using a patched version of the Python SDK but an unpatched >> > >> version of the job server jar; so the python version (2.35.0.dev2) >> > >> doesn't >> > >> match the JobServer version 2.35.0? As a result, the portable runner >> > >> thinks >> > >> it needs to upload the Jar? >> > >> >> > >> We build our own version of the Python SDK because we need a fix for >> > >> https://issues.apache.org/jira/browse/BEAM-12244. >> > >> >> > >> When we were using 2.33 we were also building our own Flink Jars in >> > >> order >> > >> to pull in Kafka patches which hadn't been released yet. >> > >> >> > >> Thanks >> > >> J >> > >> >> > >> >> > >> >> > >> >> >