Hi Kyle,

Thanks for reviewing https://github.com/apache/beam/pull/14923 
<https://github.com/apache/beam/pull/14923>. I would like to follow up with the 
deadline & waitForReady on ExternalEnvironment here.

In Kubernetes, if my understanding is correct, there is no ordering support 
when bringing up containers (init container does not work for external mode 
because it requires init container to complete). Therefore, if we bring up the 
external worker pool container together with the runner container, which is one 
the supported approach by Flink Runner on K8s [1], then it is possible that 
external worker pool service may not be available when runner tries to connect. 

What are the recommended way to guarantee the availability of external worker 
pool service before connecting? We would like to adopt the pattern to support 
Samza runner on K8s as well.

Best,
Ke


[1] [Public] Beam Flink K8s: 
https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
 
<https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898>
 

> On May 27, 2021, at 6:11 PM, Ke Wu <ke.wu...@gmail.com> wrote:
> 
> Good to know. We are working on running java portable pipeline for Samza 
> runner and I believe we could take on the task to enhance the java workflow 
> to support timeout/retry etc on gRPC calls. 
> 
> Created BEAM-12419 <https://issues.apache.org/jira/browse/BEAM-12419> to 
> track the work.
> 
> Best,
> Ke
> 
>> On May 27, 2021, at 4:30 PM, Kyle Weaver <kcwea...@google.com 
>> <mailto:kcwea...@google.com>> wrote:
>> 
>> I don't think there's any specific reason we don't set a timeout, I'm 
>> guessing it was just never worth the effort of implementing. If it's stuck 
>> it should be pretty obvious from the logs: "Still waiting for startup of 
>> environment from {} for worker id {}"
>> 
>> On Thu, May 27, 2021 at 4:04 PM Ke Wu <ke.wu...@gmail.com 
>> <mailto:ke.wu...@gmail.com>> wrote:
>> Hi Kyle,
>> 
>> Thank you for the prompt response and apologize for the late reply. 
>> 
>> [1] seems to be only available in python portable_runner but not java 
>> PortableRunner, is it intended or we could add similar changes in java as 
>> well?
>> 
>> [2] makes sense to block since the wait/retry is handled in the previous 
>> prepare(), however, is there any specific reason why we do not want to 
>> support timeout in start worker request?
>> 
>> Best,
>> Ke
>> 
>>> On May 14, 2021, at 11:25 AM, Kyle Weaver <kcwea...@google.com 
>>> <mailto:kcwea...@google.com>> wrote:
>>> 
>>> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
>>> retry/wait logic. In some places we have a configurable timeout (which 
>>> defaults to 60s) [1], while in other places we block [2][3].
>>> 
>>> [1] https://issues.apache.org/jira/browse/BEAM-7933 
>>> <https://issues.apache.org/jira/browse/BEAM-7933>
>>> [2] 
>>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>>>  
>>> <https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242>
>>> [3] 
>>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>>>  
>>> <https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115>
>>> On Fri, May 14, 2021 at 10:51 AM Ke Wu <ke.wu...@gmail.com 
>>> <mailto:ke.wu...@gmail.com>> wrote:
>>> Hello All,
>>> 
>>> I came across this question when I am reading Beam on Flink on Kubernetes 
>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.x9qy4wlfgc1g>
>>>  and flink-on-k8s-operator 
>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/0310df76d6e2128cd5d2bc51fae4e842d370c463>
>>>  and realized that there seems no retry/wait logic built in PortableRunner 
>>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
>>> implications that:
>>> 
>>> 1. Job Server needs to be ready to accept request before SDK Client could 
>>> submit request.
>>> 2. External Worker Pool Service needs to be ready to accept start/stop 
>>> worker request before runner starts to request.
>>> 
>>> This may bring some challenges on k8s since Flink opt to use multi 
>>> containers pattern when bringing up a beam portable pipeline, in addition, 
>>> I don’t find any special lifecycle management in place to guarantee the 
>>> order, e.g. External Worker Pool Service container to start and ready 
>>> before the task manager container to start making requests. 
>>> 
>>> I am wondering if I missed anything to guarantee the readiness of the 
>>> dependent service or we are relying on that dependent containers are much 
>>> lighter weigh so it should, in most time, be ready before the other 
>>> container start to make requests. 
>>> 
>>> Best,
>>> Ke
>>> 
>> 
> 

Reply via email to