Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-03 Thread Ke Wu
There is also a talk [1] which introduces dynamic scaling a stream processing 
job at LinkedIn with Samza runner as well

[1] https://www.usenix.org/conference/hotcloud20/presentation/singh 
 

> On Jun 3, 2021, at 1:59 PM, Ke Wu  wrote:
> 
> Also, are there any resources I can use to find out more about how horizontal 
> scaling works in Samza?
> 
> It is a configuration [1] passed along with job submission, then Job 
> Coordinator, similar to Job Manager in Flink, asks for Yarn Resource Manager 
> to allocate containers, or Kubernetes API server to allocate Pods. 
> 
> configure the job to use the PROCESS environment.
> 
> Agreed that a custom image with fat jar inside + PROCESS environment works 
> too, we prefer EXTERNAL environment because it gives us isolation between the 
> runner and sdk worker, where the runner container can be running completely 
> based on a framework image.
> 
>  
> 
> [1] 
> https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count
>  
> 
>  
> 
> On Thu, Jun 3, 2021 at 12:45 PM Kyle Weaver  > wrote:
> However, not all runners follow the pattern where a predefined number of 
> workers are brought up before job submission, for example, for Samza runner, 
> the number of workers needed for a job is determined after the job submission 
> happens, in which case, in the Samza worker Pod, which is similar to “Task 
> Manager Pod” in Flink, is brought up together after job submission and the 
> runner container in this POD need to connect to worker pool service at much 
> earlier time.
>  
> Makes sense. In that case, the best alternative to worker pools is probably 
> to create a custom Samza/Flink worker container image that includes whatever 
> dependencies necessary to run the Beam user code, and then configure the job 
> to use the PROCESS environment.
> 
> Also, are there any resources I can use to find out more about how horizontal 
> scaling works in Samza?
> 
> On Wed, Jun 2, 2021 at 6:39 PM Ke Wu  > wrote:
> Very good point. We are actually talking about the same high level approach 
> where Task Manager Pod has two containers inside running, one is task manager 
> container while the other is worker pool service container.
> 
> I believe the disconnect probably lies in how a job is being 
> deployed/started. In the GCP Flink operator example, it is completely true 
> that the likelihood where the worker pool service is not available when the 
> task manager container needs to connect to it is extremely low. It is because 
> the worker pool service is being brought up together when the Flink cluster 
> is being brought up, which is before the job submission even happens.
> 
> However, not all runners follow the pattern where a predefined number of 
> workers are brought up before job submission, for example, for Samza runner, 
> the number of workers needed for a job is determined after the job submission 
> happens, in which case, in the Samza worker Pod, which is similar to “Task 
> Manager Pod” in Flink, is brought up together after job submission and the 
> runner container in this POD need to connect to worker pool service at much 
> earlier time.
> 
> In addition, if I understand correctly, Flink is planning to add support for 
> dynamically adding new task managers after job submission [1], in which case, 
> the task manager container and worker pool service container in the same Task 
> Manager Pod could be started together and the task manager container need to 
> connect to the worker pool service container sooner. 
> 
> Hope this clarifies things better. Let me know if you have more questions.
> 
> Best,
> Ke
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10407 
>  
> 
>> On Jun 2, 2021, at 4:27 PM, Kyle Weaver > > wrote:
>> 
>> Therefore, if we bring up the external worker pool container together with 
>> the runner container, which is one the supported approach by Flink Runner on 
>> K8s
>> 
>> Exactly which approach are you talking about in the doc? I feel like there 
>> could be some misunderstanding here. Here is the configuration I'm talking 
>> about: 
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>>  
>> 
>> 
>> Basically this config is describing a Flink task manager with a Beam worker 
>> pool sidecar. The user invokes it with:
>> 
>> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
>> 
>> It doesn't matter which container is started first, the task manager 
>> 

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-03 Thread Kyle Weaver
>
> However, not all runners follow the pattern where a predefined number of
> workers are brought up before job submission, for example, for Samza
> runner, the number of workers needed for a job is determined after the job
> submission happens, in which case, in the Samza worker Pod, which is
> similar to “Task Manager Pod” in Flink, is brought up together after job
> submission and the runner container in this POD need to connect to worker
> pool service at much earlier time.


Makes sense. In that case, the best alternative to worker pools is probably
to create a custom Samza/Flink worker container image that includes
whatever dependencies necessary to run the Beam user code, and then
configure the job to use the PROCESS environment.

Also, are there any resources I can use to find out more about how
horizontal scaling works in Samza?

On Wed, Jun 2, 2021 at 6:39 PM Ke Wu  wrote:

> Very good point. We are actually talking about the same high level
> approach where Task Manager Pod has two containers inside running, one is
> task manager container while the other is worker pool service container.
>
> I believe the disconnect probably lies in how a job is being
> deployed/started. In the GCP Flink operator example, it is completely true
> that the likelihood where the worker pool service is not available when the
> task manager container needs to connect to it is extremely low. It is
> because the worker pool service is being brought up together when the Flink
> cluster is being brought up, which is before the job submission even
> happens.
>
> However, not all runners follow the pattern where a predefined number of
> workers are brought up before job submission, for example, for Samza
> runner, the number of workers needed for a job is determined after the job
> submission happens, in which case, in the Samza worker Pod, which is
> similar to “Task Manager Pod” in Flink, is brought up together after job
> submission and the runner container in this POD need to connect to worker
> pool service at much earlier time.
>
> In addition, if I understand correctly, Flink is planning to add support
> for dynamically adding new task managers after job submission [1], in which
> case, the task manager container and worker pool service container in the
> same Task Manager Pod could be started together and the task manager
> container need to connect to the worker pool service container sooner.
>
> Hope this clarifies things better. Let me know if you have more questions.
>
> Best,
> Ke
>
> [1] https://issues.apache.org/jira/browse/FLINK-10407
>
> On Jun 2, 2021, at 4:27 PM, Kyle Weaver  wrote:
>
> Therefore, if we bring up the external worker pool container together with
>> the runner container, which is one the supported approach by Flink Runner
>> on K8s
>
>
> Exactly which approach are you talking about in the doc? I feel like there
> could be some misunderstanding here. Here is the configuration I'm talking
> about:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>
> Basically this config is describing a Flink task manager with a Beam
> worker pool sidecar. The user invokes it with:
>
> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
>
> It doesn't matter which container is started first, the task manager
> container or the worker pool sidecar, because no communication between the
> two containers is necessary at this time.
>
> The instructions are to start the cluster first and wait until it is ready
> to submit a job, e.g.:
>
> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
>
> The task manager only sends the worker pool requests once it's running a
> job. So for things to go wrong in the way you describe:
>
> 1. The user submits a job, then starts a Flink cluster -- reversing the
> order of steps in the instructions.
> 2. The worker pool sidecar takes way longer to start up than the task
> manager container for some reason.
> 3. The Flink cluster accepts and starts running the job before the worker
> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle
> management or the Flink operator implementation to be sure if this is even
> possible.
>
> I've never seen this happen. But, of course there are plenty of unforeseen
> ways things can go wrong. So I'm not opposed to improving our error
> handling here more generally.
>
>
>


Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Ke Wu
Very good point. We are actually talking about the same high level approach 
where Task Manager Pod has two containers inside running, one is task manager 
container while the other is worker pool service container.

I believe the disconnect probably lies in how a job is being deployed/started. 
In the GCP Flink operator example, it is completely true that the likelihood 
where the worker pool service is not available when the task manager container 
needs to connect to it is extremely low. It is because the worker pool service 
is being brought up together when the Flink cluster is being brought up, which 
is before the job submission even happens.

However, not all runners follow the pattern where a predefined number of 
workers are brought up before job submission, for example, for Samza runner, 
the number of workers needed for a job is determined after the job submission 
happens, in which case, in the Samza worker Pod, which is similar to “Task 
Manager Pod” in Flink, is brought up together after job submission and the 
runner container in this POD need to connect to worker pool service at much 
earlier time.

In addition, if I understand correctly, Flink is planning to add support for 
dynamically adding new task managers after job submission [1], in which case, 
the task manager container and worker pool service container in the same Task 
Manager Pod could be started together and the task manager container need to 
connect to the worker pool service container sooner. 

Hope this clarifies things better. Let me know if you have more questions.

Best,
Ke

[1] https://issues.apache.org/jira/browse/FLINK-10407 
 

> On Jun 2, 2021, at 4:27 PM, Kyle Weaver  wrote:
> 
> Therefore, if we bring up the external worker pool container together with 
> the runner container, which is one the supported approach by Flink Runner on 
> K8s
> 
> Exactly which approach are you talking about in the doc? I feel like there 
> could be some misunderstanding here. Here is the configuration I'm talking 
> about: 
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>  
> 
> 
> Basically this config is describing a Flink task manager with a Beam worker 
> pool sidecar. The user invokes it with:
> 
> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
> 
> It doesn't matter which container is started first, the task manager 
> container or the worker pool sidecar, because no communication between the 
> two containers is necessary at this time.
> 
> The instructions are to start the cluster first and wait until it is ready to 
> submit a job, e.g.:
> 
> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
> 
> The task manager only sends the worker pool requests once it's running a job. 
> So for things to go wrong in the way you describe:
> 
> 1. The user submits a job, then starts a Flink cluster -- reversing the order 
> of steps in the instructions.
> 2. The worker pool sidecar takes way longer to start up than the task manager 
> container for some reason.
> 3. The Flink cluster accepts and starts running the job before the worker 
> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle 
> management or the Flink operator implementation to be sure if this is even 
> possible.
> 
> I've never seen this happen. But, of course there are plenty of unforeseen 
> ways things can go wrong. So I'm not opposed to improving our error handling 
> here more generally.
> 



Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Kyle Weaver
>
> Therefore, if we bring up the external worker pool container together with
> the runner container, which is one the supported approach by Flink Runner
> on K8s


Exactly which approach are you talking about in the doc? I feel like there
could be some misunderstanding here. Here is the configuration I'm talking
about:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml

Basically this config is describing a Flink task manager with a Beam worker
pool sidecar. The user invokes it with:

kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml

It doesn't matter which container is started first, the task manager
container or the worker pool sidecar, because no communication between the
two containers is necessary at this time.

The instructions are to start the cluster first and wait until it is ready
to submit a job, e.g.:

kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml

The task manager only sends the worker pool requests once it's running a
job. So for things to go wrong in the way you describe:

1. The user submits a job, then starts a Flink cluster -- reversing the
order of steps in the instructions.
2. The worker pool sidecar takes way longer to start up than the task
manager container for some reason.
3. The Flink cluster accepts and starts running the job before the worker
pool sidecar is ready -- I'm not familiar enough with k8s lifecycle
management or the Flink operator implementation to be sure if this is even
possible.

I've never seen this happen. But, of course there are plenty of unforeseen
ways things can go wrong. So I'm not opposed to improving our error
handling here more generally.


Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Ke Wu
I do agree that it usually takes longer for runner before tries to connect than 
external worker to become available, I suppose that is probably why we have the 
external service pool in the current way.

However, I am not 100% confident to say it won’t happen in practice because the 
design does seem to have the potential to result in such failure. The question 
is, do we want to rely on the assumption that worker pool usually starts faster 
than runner connects to it or we should add some preventative approaches to try 
to mitigate it ever happens? The answer may be OK to depend on the assumption, 
in addition, if there are some data points showing the possibility that worker 
is ready first, that will give us more confidence. 

What are your thoughts?

Best,
Ke 

> On Jun 2, 2021, at 12:37 PM, Kyle Weaver  wrote:
> 
> As far as I'm aware there's nothing strictly guaranteeing the worker pool has 
> been started. But in practice it takes a while for the job to start up - the 
> pipeline needs to be constructed, sent to the job server, translated, and 
> then the runner needs to start the job, etc. before the external environment 
> request is sent. So I'm not sure whether or not the problem you describe 
> would ever actually happen.
> 
> On Wed, Jun 2, 2021 at 11:01 AM Ke Wu  > wrote:
> Hi Kyle,
> 
> Thanks for reviewing https://github.com/apache/beam/pull/14923 
> . I would like to follow up with 
> the deadline & waitForReady on ExternalEnvironment here.
> 
> In Kubernetes, if my understanding is correct, there is no ordering support 
> when bringing up containers (init container does not work for external mode 
> because it requires init container to complete). Therefore, if we bring up 
> the external worker pool container together with the runner container, which 
> is one the supported approach by Flink Runner on K8s [1], then it is possible 
> that external worker pool service may not be available when runner tries to 
> connect. 
> 
> What are the recommended way to guarantee the availability of external worker 
> pool service before connecting? We would like to adopt the pattern to support 
> Samza runner on K8s as well.
> 
> Best,
> Ke
> 
> 
> [1] [Public] Beam Flink K8s: 
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
>  
> 
>  
> 
>> On May 27, 2021, at 6:11 PM, Ke Wu > > wrote:
>> 
>> Good to know. We are working on running java portable pipeline for Samza 
>> runner and I believe we could take on the task to enhance the java workflow 
>> to support timeout/retry etc on gRPC calls. 
>> 
>> Created BEAM-12419  to 
>> track the work.
>> 
>> Best,
>> Ke
>> 
>>> On May 27, 2021, at 4:30 PM, Kyle Weaver >> > wrote:
>>> 
>>> I don't think there's any specific reason we don't set a timeout, I'm 
>>> guessing it was just never worth the effort of implementing. If it's stuck 
>>> it should be pretty obvious from the logs: "Still waiting for startup of 
>>> environment from {} for worker id {}"
>>> 
>>> On Thu, May 27, 2021 at 4:04 PM Ke Wu >> > wrote:
>>> Hi Kyle,
>>> 
>>> Thank you for the prompt response and apologize for the late reply. 
>>> 
>>> [1] seems to be only available in python portable_runner but not java 
>>> PortableRunner, is it intended or we could add similar changes in java as 
>>> well?
>>> 
>>> [2] makes sense to block since the wait/retry is handled in the previous 
>>> prepare(), however, is there any specific reason why we do not want to 
>>> support timeout in start worker request?
>>> 
>>> Best,
>>> Ke
>>> 
 On May 14, 2021, at 11:25 AM, Kyle Weaver >>> > wrote:
 
 1. and 2. are both facilitated by GRPC, which takes care of most of the 
 retry/wait logic. In some places we have a configurable timeout (which 
 defaults to 60s) [1], while in other places we block [2][3].
 
 [1] https://issues.apache.org/jira/browse/BEAM-7933 
 
 [2] 
 https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
  
 
 [3] 
 https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
  
 

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Kyle Weaver
As far as I'm aware there's nothing strictly guaranteeing the worker pool
has been started. But in practice it takes a while for the job to start up
- the pipeline needs to be constructed, sent to the job server, translated,
and then the runner needs to start the job, etc. before the external
environment request is sent. So I'm not sure whether or not the problem you
describe would ever actually happen.

On Wed, Jun 2, 2021 at 11:01 AM Ke Wu  wrote:

> Hi Kyle,
>
> Thanks for reviewing https://github.com/apache/beam/pull/14923. I would
> like to follow up with the deadline & waitForReady on ExternalEnvironment
> here.
>
> In Kubernetes, if my understanding is correct, there is no ordering
> support when bringing up containers (init container does not work for
> external mode because it requires init container to complete). Therefore,
> if we bring up the external worker pool container together with the runner
> container, which is one the supported approach by Flink Runner on K8s [1],
> then it is possible that external worker pool service may not be available
> when runner tries to connect.
>
> What are the recommended way to guarantee the availability of external
> worker pool service before connecting? We would like to adopt the pattern
> to support Samza runner on K8s as well.
>
> Best,
> Ke
>
>
> [1] [Public] Beam Flink K8s:
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
>
> On May 27, 2021, at 6:11 PM, Ke Wu  wrote:
>
> Good to know. We are working on running java portable pipeline for Samza
> runner and I believe we could take on the task to enhance the java workflow
> to support timeout/retry etc on gRPC calls.
>
> Created BEAM-12419  to
> track the work.
>
> Best,
> Ke
>
> On May 27, 2021, at 4:30 PM, Kyle Weaver  wrote:
>
> I don't think there's any specific reason we don't set a timeout, I'm
> guessing it was just never worth the effort of implementing. If it's stuck
> it should be pretty obvious from the logs: "Still waiting for startup of
> environment from {} for worker id {}"
>
> On Thu, May 27, 2021 at 4:04 PM Ke Wu  wrote:
>
>> Hi Kyle,
>>
>> Thank you for the prompt response and apologize for the late reply.
>>
>> [1] seems to be only available in python portable_runner but not java
>> PortableRunner, is it intended or we could add similar changes in java as
>> well?
>>
>> [2] makes sense to block since the wait/retry is handled in the previous
>> prepare(), however, is there any specific reason why we do not want to
>> support timeout in start worker request?
>>
>> Best,
>> Ke
>>
>> On May 14, 2021, at 11:25 AM, Kyle Weaver  wrote:
>>
>> 1. and 2. are both facilitated by GRPC, which takes care of most of the
>> retry/wait logic. In some places we have a configurable timeout (which
>> defaults to 60s) [1], while in other places we block [2][3].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-7933
>> [2]
>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>> [3]
>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>>
>> On Fri, May 14, 2021 at 10:51 AM Ke Wu  wrote:
>>
>>> Hello All,
>>>
>>> I came across this question when I am reading Beam on Flink on
>>> Kubernetes
>>> 
>>>  and
>>> flink-on-k8s-operator
>>> 
>>>  and
>>> realized that there seems no retry/wait logic built in PortableRunner
>>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates
>>> implications that:
>>>
>>> 1. Job Server needs to be ready to accept request before SDK Client
>>> could submit request.
>>> 2. External Worker Pool Service needs to be ready to accept start/stop
>>> worker request before runner starts to request.
>>>
>>> This may bring some challenges on k8s since Flink opt to use multi
>>> containers pattern when bringing up a beam portable pipeline, in addition,
>>> I don’t find any special lifecycle management in place to guarantee the
>>> order, e.g. External Worker Pool Service container to start and ready
>>> before the task manager container to start making requests.
>>>
>>> I am wondering if I missed anything to guarantee the readiness of the
>>> dependent service or we are relying on that dependent containers are much
>>> lighter weigh so it should, in most time, be ready before the other
>>> container start to make requests.
>>>
>>> Best,
>>> Ke
>>>
>>>
>>
>
>


Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Ke Wu
Hi Kyle,

Thanks for reviewing https://github.com/apache/beam/pull/14923 
. I would like to follow up with the 
deadline & waitForReady on ExternalEnvironment here.

In Kubernetes, if my understanding is correct, there is no ordering support 
when bringing up containers (init container does not work for external mode 
because it requires init container to complete). Therefore, if we bring up the 
external worker pool container together with the runner container, which is one 
the supported approach by Flink Runner on K8s [1], then it is possible that 
external worker pool service may not be available when runner tries to connect. 

What are the recommended way to guarantee the availability of external worker 
pool service before connecting? We would like to adopt the pattern to support 
Samza runner on K8s as well.

Best,
Ke


[1] [Public] Beam Flink K8s: 
https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
 

 

> On May 27, 2021, at 6:11 PM, Ke Wu  wrote:
> 
> Good to know. We are working on running java portable pipeline for Samza 
> runner and I believe we could take on the task to enhance the java workflow 
> to support timeout/retry etc on gRPC calls. 
> 
> Created BEAM-12419  to 
> track the work.
> 
> Best,
> Ke
> 
>> On May 27, 2021, at 4:30 PM, Kyle Weaver > > wrote:
>> 
>> I don't think there's any specific reason we don't set a timeout, I'm 
>> guessing it was just never worth the effort of implementing. If it's stuck 
>> it should be pretty obvious from the logs: "Still waiting for startup of 
>> environment from {} for worker id {}"
>> 
>> On Thu, May 27, 2021 at 4:04 PM Ke Wu > > wrote:
>> Hi Kyle,
>> 
>> Thank you for the prompt response and apologize for the late reply. 
>> 
>> [1] seems to be only available in python portable_runner but not java 
>> PortableRunner, is it intended or we could add similar changes in java as 
>> well?
>> 
>> [2] makes sense to block since the wait/retry is handled in the previous 
>> prepare(), however, is there any specific reason why we do not want to 
>> support timeout in start worker request?
>> 
>> Best,
>> Ke
>> 
>>> On May 14, 2021, at 11:25 AM, Kyle Weaver >> > wrote:
>>> 
>>> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
>>> retry/wait logic. In some places we have a configurable timeout (which 
>>> defaults to 60s) [1], while in other places we block [2][3].
>>> 
>>> [1] https://issues.apache.org/jira/browse/BEAM-7933 
>>> 
>>> [2] 
>>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>>>  
>>> 
>>> [3] 
>>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>>>  
>>> 
>>> On Fri, May 14, 2021 at 10:51 AM Ke Wu >> > wrote:
>>> Hello All,
>>> 
>>> I came across this question when I am reading Beam on Flink on Kubernetes 
>>> 
>>>  and flink-on-k8s-operator 
>>> 
>>>  and realized that there seems no retry/wait logic built in PortableRunner 
>>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
>>> implications that:
>>> 
>>> 1. Job Server needs to be ready to accept request before SDK Client could 
>>> submit request.
>>> 2. External Worker Pool Service needs to be ready to accept start/stop 
>>> worker request before runner starts to request.
>>> 
>>> This may bring some challenges on k8s since Flink opt to use multi 
>>> containers pattern when bringing up a beam portable pipeline, in addition, 
>>> I don’t find any special lifecycle management in place to guarantee the 
>>> order, e.g. External Worker Pool Service container to start and ready 
>>> before the task manager container to start making requests. 
>>> 
>>> I am wondering if I missed anything to guarantee the readiness of the 
>>> dependent service or we are relying on that dependent containers are much 
>>> lighter 

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-27 Thread Ke Wu
Good to know. We are working on running java portable pipeline for Samza runner 
and I believe we could take on the task to enhance the java workflow to support 
timeout/retry etc on gRPC calls. 

Created BEAM-12419  to track 
the work.

Best,
Ke

> On May 27, 2021, at 4:30 PM, Kyle Weaver  wrote:
> 
> I don't think there's any specific reason we don't set a timeout, I'm 
> guessing it was just never worth the effort of implementing. If it's stuck it 
> should be pretty obvious from the logs: "Still waiting for startup of 
> environment from {} for worker id {}"
> 
> On Thu, May 27, 2021 at 4:04 PM Ke Wu  > wrote:
> Hi Kyle,
> 
> Thank you for the prompt response and apologize for the late reply. 
> 
> [1] seems to be only available in python portable_runner but not java 
> PortableRunner, is it intended or we could add similar changes in java as 
> well?
> 
> [2] makes sense to block since the wait/retry is handled in the previous 
> prepare(), however, is there any specific reason why we do not want to 
> support timeout in start worker request?
> 
> Best,
> Ke
> 
>> On May 14, 2021, at 11:25 AM, Kyle Weaver > > wrote:
>> 
>> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
>> retry/wait logic. In some places we have a configurable timeout (which 
>> defaults to 60s) [1], while in other places we block [2][3].
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-7933 
>> 
>> [2] 
>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>>  
>> 
>> [3] 
>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>>  
>> 
>> On Fri, May 14, 2021 at 10:51 AM Ke Wu > > wrote:
>> Hello All,
>> 
>> I came across this question when I am reading Beam on Flink on Kubernetes 
>> 
>>  and flink-on-k8s-operator 
>> 
>>  and realized that there seems no retry/wait logic built in PortableRunner 
>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
>> implications that:
>> 
>> 1. Job Server needs to be ready to accept request before SDK Client could 
>> submit request.
>> 2. External Worker Pool Service needs to be ready to accept start/stop 
>> worker request before runner starts to request.
>> 
>> This may bring some challenges on k8s since Flink opt to use multi 
>> containers pattern when bringing up a beam portable pipeline, in addition, I 
>> don’t find any special lifecycle management in place to guarantee the order, 
>> e.g. External Worker Pool Service container to start and ready before the 
>> task manager container to start making requests. 
>> 
>> I am wondering if I missed anything to guarantee the readiness of the 
>> dependent service or we are relying on that dependent containers are much 
>> lighter weigh so it should, in most time, be ready before the other 
>> container start to make requests. 
>> 
>> Best,
>> Ke
>> 
> 



Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-27 Thread Kyle Weaver
I don't think there's any specific reason we don't set a timeout, I'm
guessing it was just never worth the effort of implementing. If it's stuck
it should be pretty obvious from the logs: "Still waiting for startup of
environment from {} for worker id {}"

On Thu, May 27, 2021 at 4:04 PM Ke Wu  wrote:

> Hi Kyle,
>
> Thank you for the prompt response and apologize for the late reply.
>
> [1] seems to be only available in python portable_runner but not java
> PortableRunner, is it intended or we could add similar changes in java as
> well?
>
> [2] makes sense to block since the wait/retry is handled in the previous
> prepare(), however, is there any specific reason why we do not want to
> support timeout in start worker request?
>
> Best,
> Ke
>
> On May 14, 2021, at 11:25 AM, Kyle Weaver  wrote:
>
> 1. and 2. are both facilitated by GRPC, which takes care of most of the
> retry/wait logic. In some places we have a configurable timeout (which
> defaults to 60s) [1], while in other places we block [2][3].
>
> [1] https://issues.apache.org/jira/browse/BEAM-7933
> [2]
> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
> [3]
> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>
> On Fri, May 14, 2021 at 10:51 AM Ke Wu  wrote:
>
>> Hello All,
>>
>> I came across this question when I am reading Beam on Flink on Kubernetes
>> 
>>  and
>> flink-on-k8s-operator
>> 
>>  and
>> realized that there seems no retry/wait logic built in PortableRunner
>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates
>> implications that:
>>
>> 1. Job Server needs to be ready to accept request before SDK Client could
>> submit request.
>> 2. External Worker Pool Service needs to be ready to accept start/stop
>> worker request before runner starts to request.
>>
>> This may bring some challenges on k8s since Flink opt to use multi
>> containers pattern when bringing up a beam portable pipeline, in addition,
>> I don’t find any special lifecycle management in place to guarantee the
>> order, e.g. External Worker Pool Service container to start and ready
>> before the task manager container to start making requests.
>>
>> I am wondering if I missed anything to guarantee the readiness of the
>> dependent service or we are relying on that dependent containers are much
>> lighter weigh so it should, in most time, be ready before the other
>> container start to make requests.
>>
>> Best,
>> Ke
>>
>>
>


Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-27 Thread Ke Wu
Hi Kyle,

Thank you for the prompt response and apologize for the late reply. 

[1] seems to be only available in python portable_runner but not java 
PortableRunner, is it intended or we could add similar changes in java as well?

[2] makes sense to block since the wait/retry is handled in the previous 
prepare(), however, is there any specific reason why we do not want to support 
timeout in start worker request?

Best,
Ke

> On May 14, 2021, at 11:25 AM, Kyle Weaver  wrote:
> 
> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
> retry/wait logic. In some places we have a configurable timeout (which 
> defaults to 60s) [1], while in other places we block [2][3].
> 
> [1] https://issues.apache.org/jira/browse/BEAM-7933 
> 
> [2] 
> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>  
> 
> [3] 
> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>  
> 
> On Fri, May 14, 2021 at 10:51 AM Ke Wu  > wrote:
> Hello All,
> 
> I came across this question when I am reading Beam on Flink on Kubernetes 
> 
>  and flink-on-k8s-operator 
> 
>  and realized that there seems no retry/wait logic built in PortableRunner 
> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
> implications that:
> 
> 1. Job Server needs to be ready to accept request before SDK Client could 
> submit request.
> 2. External Worker Pool Service needs to be ready to accept start/stop worker 
> request before runner starts to request.
> 
> This may bring some challenges on k8s since Flink opt to use multi containers 
> pattern when bringing up a beam portable pipeline, in addition, I don’t find 
> any special lifecycle management in place to guarantee the order, e.g. 
> External Worker Pool Service container to start and ready before the task 
> manager container to start making requests. 
> 
> I am wondering if I missed anything to guarantee the readiness of the 
> dependent service or we are relying on that dependent containers are much 
> lighter weigh so it should, in most time, be ready before the other container 
> start to make requests. 
> 
> Best,
> Ke
> 



Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-14 Thread Kyle Weaver
1. and 2. are both facilitated by GRPC, which takes care of most of the
retry/wait logic. In some places we have a configurable timeout (which
defaults to 60s) [1], while in other places we block [2][3].

[1] https://issues.apache.org/jira/browse/BEAM-7933
[2]
https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
[3]
https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115

On Fri, May 14, 2021 at 10:51 AM Ke Wu  wrote:

> Hello All,
>
> I came across this question when I am reading Beam on Flink on Kubernetes
> 
>  and
> flink-on-k8s-operator
> 
>  and
> realized that there seems no retry/wait logic built in PortableRunner
> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates
> implications that:
>
> 1. Job Server needs to be ready to accept request before SDK Client could
> submit request.
> 2. External Worker Pool Service needs to be ready to accept start/stop
> worker request before runner starts to request.
>
> This may bring some challenges on k8s since Flink opt to use multi
> containers pattern when bringing up a beam portable pipeline, in addition,
> I don’t find any special lifecycle management in place to guarantee the
> order, e.g. External Worker Pool Service container to start and ready
> before the task manager container to start making requests.
>
> I am wondering if I missed anything to guarantee the readiness of the
> dependent service or we are relying on that dependent containers are much
> lighter weigh so it should, in most time, be ready before the other
> container start to make requests.
>
> Best,
> Ke
>
>


[DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-14 Thread Ke Wu
Hello All,

I came across this question when I am reading Beam on Flink on Kubernetes 

 and flink-on-k8s-operator 

 and realized that there seems no retry/wait logic built in PortableRunner nor 
ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
implications that:

1. Job Server needs to be ready to accept request before SDK Client could 
submit request.
2. External Worker Pool Service needs to be ready to accept start/stop worker 
request before runner starts to request.

This may bring some challenges on k8s since Flink opt to use multi containers 
pattern when bringing up a beam portable pipeline, in addition, I don’t find 
any special lifecycle management in place to guarantee the order, e.g. External 
Worker Pool Service container to start and ready before the task manager 
container to start making requests. 

I am wondering if I missed anything to guarantee the readiness of the dependent 
service or we are relying on that dependent containers are much lighter weigh 
so it should, in most time, be ready before the other container start to make 
requests. 

Best,
Ke