Hi Gorjan,
the address of localhost is hard-coded in the python worker pool (see
[1]). There should be no need to setup a load-balancer for the
worker_pool, if you have it as another container in each TM pod, it
should suffice to replace {beam_sdk_url} with 'localhost'. Each TM will
then have its own worker_pool, which should be just fine.
Best,
Jan
[1]
https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81
On 8/14/21 4:37 PM, Gorjan Todorovski wrote:
Hi!
I need help implementing a native Kubernetes Flink cluster that needs
to run batch jobs (run by TensorFlow Extended), but I am not sure I am
configuring it right as I have issues running jobs on more than one
task manager, while jobs run fine if there is only one TM.
I use the following parameters for the job:
|"--runner=FlinkRunner", "--parallelism=4",
f"--flink_master={flink_url}:8081", "--environment_type=EXTERNAL",
f"--environment_config={beam_sdk_url}:50000",
"--flink_submit_uber_jar", "--worker_harness_container_image=none", |
I have configured the Beam workers to run as side-cars to the TM
containers. I do this by configuring. task manager template for the
pods like this:
|kubernetes.pod-template-file.taskmanager|
it is pointing out to a template file with contents:
|kind: Pod metadata: name: taskmanager-pod-template spec:
#hostNetwork: true containers: - name: flink-main-container #image:
apache/flink:scala_2.12 env: - name: AWS_REGION value: "eu-central-1"
- name: S3_VERIFY_SSL value: "0" - name: PYTHONPATH value:
"/data/flink/src" args: ["taskmanager"] ports: - containerPort: 6122
#22 name: rpc - containerPort: 6125 name: query-state livenessProbe:
tcpSocket: port: 6122 #22 initialDelaySeconds: 30 periodSeconds: 60 -
name: beam-worker-pool env: - name: PYTHONPATH value:
"/data/flink/src" - name: AWS_REGION value: "eu-central-1" - name:
S3_VERIFY_SSL value: "0" image:
848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
<http://848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers>
imagePullPolicy: Always args: ["--worker_pool"] ports: -
containerPort: 50000 name: pool livenessProbe: tcpSocket: port: 50000
initialDelaySeconds: 30 periodSeconds: 60 |
I have also created a kubernetes load balancer for the task managers,
so clients can connect on port 50000. So I use that address when
configuring:
|f"--environment_config={beam_sdk_url}:50000",|
the problem is as it looks like the Beam SDK harness on one task
manager wants to connect to the endpoint running on the other task
manager, but looks for it on localhost:
Log from beam-worker-pool on TM 2:
|2021/08/11 09:43:16 Failed to obtain provisioning information: failed
to dial server at localhost:33705 caused by: context deadline exceeded |
The provision endpoint on TM 1 is the one actually listening on the
port 33705, while this is looking for it on localhost, so cannot
connect to it.
Showing how I test this:
|............... TM 1: ======== $ kubectl logs
my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 2021/08/12
09:10:34 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot Starting worker with
command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:33383',
'--artifact_endpoint=localhost:43477',
'--provision_endpoint=localhost:40983',
'--control_endpoint=localhost:34793'] 2021/08/12 09:13:05 Failed to
obtain provisioning information: failed to dial server at
localhost:40983 caused by: context deadline exceeded TM 2: ========= $
kubectl logs my-first-flink-cluster-taskmanager-1-2 -c
beam-worker-pool 2021/08/12 09:10:33 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot Starting worker with
command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:40497',
'--artifact_endpoint=localhost:36245',
'--provision_endpoint=localhost:32907',
'--control_endpoint=localhost:46083'] 2021/08/12 09:13:09 Failed to
obtain provisioning information: failed to dial server at
localhost:32907 caused by: context deadline exceeded Testing:
......................... TM 1: ============ $ kubectl exec -it
my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection
refused root@my-first-flink-cluster-taskmanager-1-1:/# curl
localhost:32907 Warning: Binary output can mess up your terminal. Use
"--output -" to ... TM 2: =============
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection
refused root@my-first-flink-cluster-taskmanager-1-2:/# curl
localhost:40983 Warning: Binary output can mess up your terminal. Use
"--output -" to tell Warning: curl to output it to your terminal
anyway, or consider "--output |
Not sure how to fix this.
Thanks, Gorjan