Thanks!

On Fri, Feb 11, 2022 at 5:22 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> Hi Jeremy,
>
> By default we stage all jars in the CLASSPATH of expansion service. You
> can override this by setting the filesToStage option when starting up the
> expansion service:
> https://github.com/apache/beam/blob/7fa5387ffac4f2801077f2e55aa2eba7a47036d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java#L38
>
> Thanks,
> Cham
>
> On Fri, Feb 11, 2022 at 2:09 PM Jeremy Lewi <jeremy.l...@primer.ai> wrote:
>
>> Hi Folks,
>>
>> So I think this is what's happening. My pipeline has multi-language
>> transforms because its using Kafka IO. When the runner calls transform on
>> those transforms it contacts the expansion service which responds back with
>> a list of jars. The runner then downloads those jars.
>>
>> Then when the runner stages artifacts it ends up uploading those jars to
>> the staging service.
>>
>> This seems unnecessary and inefficient. I was able to hack the
>> portable_runner.py code to test this out. When I did my job submitted quite
>> quickly (the pipeline's running but I haven't verified its working so its
>> possible I broke something).
>>
>> Is this working as intended? Is there someway to avoid this without
>> having to hack the runner code?
>>
>> Interestingly, it seems like downloading the jars is much faster than
>> uploading them but I haven't investigated this.
>>
>> J
>>
>> On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <jeremy.l...@primer.ai>
>> wrote:
>>
>>> Hi Folks,
>>>
>>> I'm using a patched version of apache Beam 2.35 python and running on
>>> Flink on Kubernetes using the PortableJobRunner.
>>>
>>> It looks like when submitting the job, the runner tries to upload a
>>> large 274 Mb flink job server jar to the staging service.
>>>
>>> This doesn't seem right. I already have an instance of the JobServer
>>> running and my program is talking to the job server so why is it trying to
>>> stage the JobServer?
>>>
>>> I believe this problem started when I upgraded to 2.35.
>>>
>>> When I debugged this I found the runner was getting stuck in
>>> offer_artifacts
>>>
>>> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235
>>>
>>> When I looked at the rolePayload of the request in question it was
>>>
>>> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar
>>>
>>> How does the runner decide which artifacts to upload? Could this be
>>> caused by using a patched version of the Python SDK but an unpatched
>>> version of the job server jar; so the python version (2.35.0.dev2) doesn't
>>> match the JobServer version 2.35.0? As a result, the portable runner thinks
>>> it needs to upload the Jar?
>>>
>>> We build our own version of the Python SDK because we need a fix for
>>> https://issues.apache.org/jira/browse/BEAM-12244.
>>>
>>> When we were using 2.33 we were also building our own Flink Jars in
>>> order to pull in Kafka patches which hadn't been released yet.
>>>
>>> Thanks
>>> J
>>>
>>>
>>>
>>>

Reply via email to