Re: Fails to deploy a python pipeline to a flink cluster

2024-05-11 Thread XQ Hu via user
Do you still have the same issue? I tried to follow your setup.sh to
reproduce this but somehow I am stuck at the word_len step. I saw you also
tried to use `print(kafka_kv)` to debug it. I am not sure about your
current status.

On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim  wrote:

> Hello,
>
> I'm playing with deploying a python pipeline to a flink cluster on
> kubernetes via flink kubernetes operator. The pipeline simply calculates
> average word lengths in a fixed time window of 5 seconds and it works with
> the embedded flink cluster.
>
> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
> named beam-python-example:1.17 created using the following docker file -
> the full details can be checked in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>
> The java sdk is used for the sdk harness of the kafka io's expansion
> service while the job server is used to execute the python pipeline in the
> flink operator.
>
> FROM flink:1.17
> ...
> ## add java SDK and job server
> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
> /opt/apache/beam/
>
> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>
> RUN chown -R flink:flink /opt/apache/beam
>
> ## install python 3.10.13
> RUN apt-get update -y && \
>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev liblzma-dev && \
>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
> ${PYTHON_VERSION}.tgz && \
> ...
> ## install apache beam 2.56.0
> RUN pip3 install apache-beam==${BEAM_VERSION}
>
> ## copy pipeline source
> RUN mkdir /opt/flink/app
> COPY word_len.py /opt/flink/app/
>
> Then the pipeline is deployed using the following manifest - the full
> details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: beam-word-len
> spec:
>   image: beam-python-example:1.17
>   imagePullPolicy: Never
>   flinkVersion: v1_17
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "5"
>   serviceAccount: flink
>   podTemplate:
> spec:
>   containers:
> - name: flink-main-container
>   env:
> - name: BOOTSTRAP_SERVERS
>   value: demo-cluster-kafka-bootstrap:9092
> ...
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> replicas: 2
> resource:
>   memory: "2048m"
>   cpu: 1
> podTemplate:
>   spec:
> containers:
>   - name: python-worker-harness
> image: apache/beam_python3.10_sdk:2.56.0
> imagePullPolicy: Never
> args: ["--worker_pool"]
> ports:
>   - containerPort: 5
>
>   job:
> jarURI:
> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
> entryClass:
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
> args:
>   - "--driver-cmd"
>   - "python /opt/flink/app/word_len.py --deploy"
> parallelism: 3
> upgradeMode: stateless
>
> Here is the pipeline source - the full details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>
> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
> and its config is set to localhost:5 - I believe it'll point to the
> side car container of the task manager. For the kafka io, the expansion
> service's sdk harness is configured as PROCESS and the command points to
> the java sdk that is added in the beam-python-example:1.17 image.
>
> ...
> def run(args=None):
> parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
> parser.add_argument("--runner", default="FlinkRunner", help="Apache
> Beam runner")
> parser.add_argument(
> "--deploy",
> action="store_true",
> default="Flag to indicate whether to use an own local cluster",
> )
> opts, _ = parser.parse_known_args(args)
>
> pipeline_opts = {
> "runner": opts.runner,
> "job_name": "avg-word-length-beam",
> "streaming": True,
> "environment_type": "EXTERNAL" if opts.deploy is True else
> "LOOPBACK",
> "checkpointing_interval": "6",
> }
>
> expansion_service = None
> if pipeline_opts["environment_type"] == "EXTERNAL":
> pipeline_opts = {
> **pipeline_opts,
> **{
> "environment_config": "localhost:5",
> "flink_submit_uber_jar": True,
> },
> }
> expansion_service = kafka.default_io_expansion_service(
> append_args=[
> "--defaultEnvironmentType=PROCESS",
>
> '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
> 

Re: Fails to deploy a python pipeline to a flink cluster

2024-05-11 Thread Jaehyeon Kim
Hi XQ

I haven't changed anything and the issue would persist on my end. The print
stuff is called only when self.verbose is True and, by default, it is False.

BTW Do you have any idea about the error message? I haven't seen such error.

Cheers,
Jaehyeon

On Sun, 12 May 2024, 12:15 am XQ Hu via user,  wrote:

> Do you still have the same issue? I tried to follow your setup.sh to
> reproduce this but somehow I am stuck at the word_len step. I saw you also
> tried to use `print(kafka_kv)` to debug it. I am not sure about your
> current status.
>
> On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim  wrote:
>
>> Hello,
>>
>> I'm playing with deploying a python pipeline to a flink cluster on
>> kubernetes via flink kubernetes operator. The pipeline simply calculates
>> average word lengths in a fixed time window of 5 seconds and it works with
>> the embedded flink cluster.
>>
>> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
>> named beam-python-example:1.17 created using the following docker file -
>> the full details can be checked in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>>
>> The java sdk is used for the sdk harness of the kafka io's expansion
>> service while the job server is used to execute the python pipeline in the
>> flink operator.
>>
>> FROM flink:1.17
>> ...
>> ## add java SDK and job server
>> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
>> /opt/apache/beam/
>>
>> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>>
>> RUN chown -R flink:flink /opt/apache/beam
>>
>> ## install python 3.10.13
>> RUN apt-get update -y && \
>>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>> libffi-dev liblzma-dev && \
>>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
>> ${PYTHON_VERSION}.tgz && \
>> ...
>> ## install apache beam 2.56.0
>> RUN pip3 install apache-beam==${BEAM_VERSION}
>>
>> ## copy pipeline source
>> RUN mkdir /opt/flink/app
>> COPY word_len.py /opt/flink/app/
>>
>> Then the pipeline is deployed using the following manifest - the full
>> details can be found in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>>
>> apiVersion: flink.apache.org/v1beta1
>> kind: FlinkDeployment
>> metadata:
>>   name: beam-word-len
>> spec:
>>   image: beam-python-example:1.17
>>   imagePullPolicy: Never
>>   flinkVersion: v1_17
>>   flinkConfiguration:
>> taskmanager.numberOfTaskSlots: "5"
>>   serviceAccount: flink
>>   podTemplate:
>> spec:
>>   containers:
>> - name: flink-main-container
>>   env:
>> - name: BOOTSTRAP_SERVERS
>>   value: demo-cluster-kafka-bootstrap:9092
>> ...
>>   jobManager:
>> resource:
>>   memory: "2048m"
>>   cpu: 1
>>   taskManager:
>> replicas: 2
>> resource:
>>   memory: "2048m"
>>   cpu: 1
>> podTemplate:
>>   spec:
>> containers:
>>   - name: python-worker-harness
>> image: apache/beam_python3.10_sdk:2.56.0
>> imagePullPolicy: Never
>> args: ["--worker_pool"]
>> ports:
>>   - containerPort: 5
>>
>>   job:
>> jarURI:
>> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
>> entryClass:
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
>> args:
>>   - "--driver-cmd"
>>   - "python /opt/flink/app/word_len.py --deploy"
>> parallelism: 3
>> upgradeMode: stateless
>>
>> Here is the pipeline source - the full details can be found in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>>
>> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
>> and its config is set to localhost:5 - I believe it'll point to the
>> side car container of the task manager. For the kafka io, the expansion
>> service's sdk harness is configured as PROCESS and the command points to
>> the java sdk that is added in the beam-python-example:1.17 image.
>>
>> ...
>> def run(args=None):
>> parser = argparse.ArgumentParser(description="Beam pipeline
>> arguments")
>> parser.add_argument("--runner", default="FlinkRunner", help="Apache
>> Beam runner")
>> parser.add_argument(
>> "--deploy",
>> action="store_true",
>> default="Flag to indicate whether to use an own local cluster",
>> )
>> opts, _ = parser.parse_known_args(args)
>>
>> pipeline_opts = {
>> "runner": opts.runner,
>> "job_name": "avg-word-length-beam",
>> "streaming": True,
>> "environment_type": "EXTERNAL" if opts.deploy is True else
>> "LOOPBACK",
>> "checkpointing_interval": "6",
>> }
>>
>> expansion_service = None
>> if pipeline_opts["environment_type"] == "EX