Hi Gorjan,

the address of localhost is hard-coded in the python worker pool (see [1]). There should be no need to setup a load-balancer for the worker_pool, if you have it as another container in each TM pod, it should suffice to replace {beam_sdk_url} with 'localhost'. Each TM will then have its own worker_pool, which should be just fine.

Best,

 Jan

[1] https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81

On 8/14/21 4:37 PM, Gorjan Todorovski wrote:
Hi!

I need help implementing a native Kubernetes Flink cluster that needs to run batch jobs (run by TensorFlow Extended), but I am not sure I am configuring it right as I have issues running jobs on more than one task manager, while jobs run fine if there is only one TM.

I use the following parameters for the job:

|"--runner=FlinkRunner", "--parallelism=4", f"--flink_master={flink_url}:8081", "--environment_type=EXTERNAL", f"--environment_config={beam_sdk_url}:50000", "--flink_submit_uber_jar", "--worker_harness_container_image=none", |

I have configured the Beam workers to run as side-cars to the TM containers. I do this by configuring. task manager template for the pods like this:

|kubernetes.pod-template-file.taskmanager|

it is pointing out to a template file with contents:

|kind: Pod metadata: name: taskmanager-pod-template spec: #hostNetwork: true containers: - name: flink-main-container #image: apache/flink:scala_2.12 env: - name: AWS_REGION value: "eu-central-1" - name: S3_VERIFY_SSL value: "0" - name: PYTHONPATH value: "/data/flink/src" args: ["taskmanager"] ports: - containerPort: 6122 #22 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 #22 initialDelaySeconds: 30 periodSeconds: 60 - name: beam-worker-pool env: - name: PYTHONPATH value: "/data/flink/src" - name: AWS_REGION value: "eu-central-1" - name: S3_VERIFY_SSL value: "0" image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers <http://848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers> imagePullPolicy: Always args: ["--worker_pool"] ports: - containerPort: 50000 name: pool livenessProbe: tcpSocket: port: 50000 initialDelaySeconds: 30 periodSeconds: 60 |

I have also created a kubernetes load balancer for the task managers, so clients can connect on port 50000. So I use that address when configuring:

|f"--environment_config={beam_sdk_url}:50000",|

the problem is as it looks like the Beam SDK harness on one task manager wants to connect to the endpoint running on the other task manager, but looks for it on localhost:

Log from beam-worker-pool on TM 2:

|2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705 caused by: context deadline exceeded |

The provision endpoint on TM 1 is the one actually listening on the port 33705, while this is looking for it on localhost, so cannot connect to it.

Showing how I test this:

|............... TM 1: ======== $ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793'] 2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983 caused by: context deadline exceeded TM 2: ========= $ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool 2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083'] 2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907 caused by: context deadline exceeded Testing: ......................... TM 1: ============ $ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983 curl: (7) Failed to connect to localhost port 40983: Connection refused root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907 Warning: Binary output can mess up your terminal. Use "--output -" to ... TM 2: ============= root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907 curl: (7) Failed to connect to localhost port 32907: Connection refused root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983 Warning: Binary output can mess up your terminal. Use "--output -" to tell Warning: curl to output it to your terminal anyway, or consider "--output |

Not sure how to fix this.

Thanks, Gorjan


Reply via email to