Hi Cham,

Thanks. I tried adding my jar to the classpath in the Java SDK
container, but it made no difference.

I then saw that the loader program for the Java SDK harness hard-codes
a few jars to be in the classpath [1].

It also tacks on any jars specified to be staged, but in any case, it
does not seem to include every jar in /opt/apache/beam/jars directory.

Regards,
Deepak

[1] 
https://github.com/apache/beam/blob/c1db5fe0afabc6639ea09ca36ef13ebb63b283de/sdks/java/container/boot.go#L135

On Tue, Mar 8, 2022 at 11:07 AM Chamikara Jayalath <chamik...@google.com> wrote:
>
>
>
> On Fri, Mar 4, 2022 at 3:16 PM Deepak Nagaraj <n.dee...@gmail.com> wrote:
>>
>> Hi Cham,
>>
>> Thanks, we set filesToStage to be empty within expansion service 
>> configuration.
>>
>> Now when we run a Python pipeline that reads from Kafka, we see an exception 
>> about a missing class. [1]
>>
>> This class is present in the jar that was getting staged earlier, so in a 
>> way the failure is expected.
>>
>> However, the same jar is also present within the classpath of the 
>> taskmanager [2], yet it does not seem to be considered by the function 
>> harness.
>
>
> Hi Deepak,
>
> In general, when  "filesToStage" is excluded, everything in the CLASSPATH 
> should be staged for the SDK Harness. I'm not too familiar with the Flink 
> dependency setup but is it possible that the relevant jar does not get staged 
> for the SDK harness somehow ?
>
> Thanks,
> Cham
>
>>
>>
>> Is there a way we can let the function harness consider the available jar?
>>
>> Also, is our understanding correct that the harness only looks at jars 
>> previously uploaded to the staging service?
>>
>> Thanks,
>> Deepak
>>
>> [1] Exception about missing class:
>>
>> 2022-03-03 17:37:29,531 ERROR
>> org.apache.beam.fn.harness.control.BeamFnControlClient       [] -
>> Error thrown when handling InstructionRequest 1
>> java.lang.NoClassDefFoundError:
>> org/springframework/expression/EvaluationContext
>>        at 
>> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toHeaders(KafkaRecordCoder.java:86)
>>        at 
>> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.decode(KafkaRecordCoder.java:81)
>>        at 
>> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.decode(KafkaRecordCoder.java:40)
>>        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>        at 
>> org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64)
>>        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>        at 
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
>>        at 
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612)
>>        at 
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558)
>>        at 
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:126)
>>        at 
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:467)
>>        at 
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>        at 
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>        at 
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>        at 
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>        at java.base/java.lang.Thread.run(Thread.java:829)
>> Caused by: java.lang.ClassNotFoundException:
>> org.springframework.expression.EvaluationContext
>>        at 
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>>        at 
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>        ... 16 more
>>
>> [2] Classpath of task manager (the missing class is present in 
>> beam-runners-flink-job-server-no-slf4j.jar, which apparently is not 
>> considered by the harness):
>>
>> 2022-03-03 07:18:42,146 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -
>> Classpath: /opt/flink/lib/beam-runners-flink-job-server-no-slf4j.jar:/op
>> t/flink/lib/flink-csv-1.13.5.jar:/opt/flink/lib/flink-json-1.13.5.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.5.jar:/
>> opt/flink/lib/flink-table_2.12-1.13.5.jar:/opt/flink/lib/log4j-1.2-api-2.16.0.jar:/opt/flink/lib/log4j-api-2.16.0.jar:/opt/flink/lib/log4j-core-2.16.0.jar:/opt/flink/lib
>> /log4j-slf4j-impl-2.16.0.jar:/opt/flink/lib/flink-dist_2.12-1.13.5.jar:::
>>
>> On 2022/02/11 22:22:16 Chamikara Jayalath wrote:
>> > Hi Jeremy,
>> >
>> > By default we stage all jars in the CLASSPATH of expansion service. You can
>> > override this by setting the filesToStage option when starting up the
>> > expansion service:
>> > https://github.com/apache/beam/blob/7fa5387ffac4f2801077f2e55aa2eba7a47036d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java#L38
>> >
>> > Thanks,
>> > Cham
>> >
>> > On Fri, Feb 11, 2022 at 2:09 PM Jeremy Lewi <je...@primer.ai> wrote:
>> >
>> > > Hi Folks,
>> > >
>> > > So I think this is what's happening. My pipeline has multi-language
>> > > transforms because its using Kafka IO. When the runner calls transform on
>> > > those transforms it contacts the expansion service which responds back 
>> > > with
>> > > a list of jars. The runner then downloads those jars.
>> > >
>> > > Then when the runner stages artifacts it ends up uploading those jars to
>> > > the staging service.
>> > >
>> > > This seems unnecessary and inefficient. I was able to hack the
>> > > portable_runner.py code to test this out. When I did my job submitted 
>> > > quite
>> > > quickly (the pipeline's running but I haven't verified its working so its
>> > > possible I broke something).
>> > >
>> > > Is this working as intended? Is there someway to avoid this without 
>> > > having
>> > > to hack the runner code?
>> > >
>> > > Interestingly, it seems like downloading the jars is much faster than
>> > > uploading them but I haven't investigated this.
>> > >
>> > > J
>> > >
>> > > On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <je...@primer.ai> wrote:
>> > >
>> > >> Hi Folks,
>> > >>
>> > >> I'm using a patched version of apache Beam 2.35 python and running on
>> > >> Flink on Kubernetes using the PortableJobRunner.
>> > >>
>> > >> It looks like when submitting the job, the runner tries to upload a 
>> > >> large
>> > >> 274 Mb flink job server jar to the staging service.
>> > >>
>> > >> This doesn't seem right. I already have an instance of the JobServer
>> > >> running and my program is talking to the job server so why is it trying 
>> > >> to
>> > >> stage the JobServer?
>> > >>
>> > >> I believe this problem started when I upgraded to 2.35.
>> > >>
>> > >> When I debugged this I found the runner was getting stuck in
>> > >> offer_artifacts
>> > >>
>> > >> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235
>> > >>
>> > >> When I looked at the rolePayload of the request in question it was
>> > >>
>> > >> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar
>> > >>
>> > >> How does the runner decide which artifacts to upload? Could this be
>> > >> caused by using a patched version of the Python SDK but an unpatched
>> > >> version of the job server jar; so the python version (2.35.0.dev2) 
>> > >> doesn't
>> > >> match the JobServer version 2.35.0? As a result, the portable runner 
>> > >> thinks
>> > >> it needs to upload the Jar?
>> > >>
>> > >> We build our own version of the Python SDK because we need a fix for
>> > >> https://issues.apache.org/jira/browse/BEAM-12244.
>> > >>
>> > >> When we were using 2.33 we were also building our own Flink Jars in 
>> > >> order
>> > >> to pull in Kafka patches which hadn't been released yet.
>> > >>
>> > >> Thanks
>> > >> J
>> > >>
>> > >>
>> > >>
>> > >>
>> >

Reply via email to