Thanks! On Fri, Feb 11, 2022 at 5:22 PM Chamikara Jayalath <chamik...@google.com> wrote:
> Hi Jeremy, > > By default we stage all jars in the CLASSPATH of expansion service. You > can override this by setting the filesToStage option when starting up the > expansion service: > https://github.com/apache/beam/blob/7fa5387ffac4f2801077f2e55aa2eba7a47036d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java#L38 > > Thanks, > Cham > > On Fri, Feb 11, 2022 at 2:09 PM Jeremy Lewi <jeremy.l...@primer.ai> wrote: > >> Hi Folks, >> >> So I think this is what's happening. My pipeline has multi-language >> transforms because its using Kafka IO. When the runner calls transform on >> those transforms it contacts the expansion service which responds back with >> a list of jars. The runner then downloads those jars. >> >> Then when the runner stages artifacts it ends up uploading those jars to >> the staging service. >> >> This seems unnecessary and inefficient. I was able to hack the >> portable_runner.py code to test this out. When I did my job submitted quite >> quickly (the pipeline's running but I haven't verified its working so its >> possible I broke something). >> >> Is this working as intended? Is there someway to avoid this without >> having to hack the runner code? >> >> Interestingly, it seems like downloading the jars is much faster than >> uploading them but I haven't investigated this. >> >> J >> >> On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <jeremy.l...@primer.ai> >> wrote: >> >>> Hi Folks, >>> >>> I'm using a patched version of apache Beam 2.35 python and running on >>> Flink on Kubernetes using the PortableJobRunner. >>> >>> It looks like when submitting the job, the runner tries to upload a >>> large 274 Mb flink job server jar to the staging service. >>> >>> This doesn't seem right. I already have an instance of the JobServer >>> running and my program is talking to the job server so why is it trying to >>> stage the JobServer? >>> >>> I believe this problem started when I upgraded to 2.35. >>> >>> When I debugged this I found the runner was getting stuck in >>> offer_artifacts >>> >>> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235 >>> >>> When I looked at the rolePayload of the request in question it was >>> >>> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar >>> >>> How does the runner decide which artifacts to upload? Could this be >>> caused by using a patched version of the Python SDK but an unpatched >>> version of the job server jar; so the python version (2.35.0.dev2) doesn't >>> match the JobServer version 2.35.0? As a result, the portable runner thinks >>> it needs to upload the Jar? >>> >>> We build our own version of the Python SDK because we need a fix for >>> https://issues.apache.org/jira/browse/BEAM-12244. >>> >>> When we were using 2.33 we were also building our own Flink Jars in >>> order to pull in Kafka patches which hadn't been released yet. >>> >>> Thanks >>> J >>> >>> >>> >>>