[ https://issues.apache.org/jira/browse/FLINK-17480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187596#comment-17187596 ]
Shuiqiang Chen edited comment on FLINK-17480 at 8/31/20, 9:45 AM: ------------------------------------------------------------------ Hi [~felixzheng], I have managed to run Python Flink jobs on Kubernetes in native session/application mode with the master branch of Flink these days. The following shows the approaches: 1. Native session mode An image with Flink, python and PyFlink environment installed is required to start a Python Flink session cluster. It could be the extension of the Flink official image as below: {code:java} FROM flink # install miniconda to prepare a python environment ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 ENV PATH /opt/conda/bin:$PATH RUN apt-get update --fix-missing && \ apt-get install -y wget bzip2 ca-certificates curl git && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-4.5.11-Linux-x86_64.sh -O ~/miniconda.sh && \ /bin/bash ~/miniconda.sh -b -p /opt/conda && \ rm ~/miniconda.sh && \ /opt/conda/bin/conda clean -tipsy && \ ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh && \ echo ". /opt/conda/etc/profile.d/conda.sh" >> ~/.bashrc && \ echo "conda activate base" >> ~/.bashrc # install python flink RUN pip install apache-flink{code} Assuming that the built image is tagged with name and version `pyflink:latest`, users can start a PyFlink session cluster by executing the following command: {code:java} ./bin/kubernetes-session.sh \ -Dkubernetes.container.image=pyflink:latest \ -Dkubernetes.cluster-id=my-flink-session \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=3600000{code} As the session cluster starts up, users are able to submit a Python Flink Job: {code:java} ./bin/flink run -m jm-host:port -py <PATH-TO-YOUR-PYTHON-CODES-DIRECTORY>/my_python_flink_job.py{code} 2. Native application mode Application mode requires users to create a single image containing their Job and the Flink runtime. Base on the built image with python and Python Flink environment provided in session mode section, users can build an flink application image like: {code:java} FROM pyflink:latest COPY /path/of/python/codes /opt/my_python_codes # if there are third party python dependencies, users can install them when building the image COPY /path/to/requirements.txt /opt/requirements.txt RUN pip install -r requirements.txt # if the job requires external java dependencies, they should be built into the image as well RUN mkdir -p $FLINK_HOME/usrlib COPY /path/of/external/jar/dependencies $FLINK_HOME/usrlib/{code} With the application image prepared and tagged as `pyflink:app`, users can submit a Python Flink application as below: {code:java} ./bin/flink run-application -p 8 -t kubernetes-application \ -Dkubernetes.cluster-id=my-flink-app \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=pyflink:app \ -Dpython.files=/opt/my_python_codes \ -c org.apache.flink.client.python.PythonDriver \ local:///opt/flink/usrlib/flink-python_2.11-1.12-SNAPSHOT.jar \ -pym <ENTRY_MODULE_NAME> (or -py /opt/my_python_codes/xxx.py) {code} Note that users must specify the following arguments: * `-Dpython.files=/opt/my_python_codes `: the path of the python codes in the image * `-c org.apache.flink.client.python.PythonDriver`: the java entry main class to execute the python codes, it is a constant but must be specified. * `local:///opt/flink/usrlib/flink-python_2.11-1.12-SNAPSHOT.jar`: the jar of python flink module, it's provided by Flink internal but must be specified. Above is the overall illustration of how to run PyFlink jobs on kubernetes in native mode. And it seems there are some spaces to improve the ease of use, maybe we can do that in the future. Best, Shuiqiang was (Author: csq): Hi [~felixzheng], I have managed to run Python Flink jobs on Kubernetes in native session/application mode with the master branch of Flink these days. The following shows the approaches: 1. Native session mode An image with Flink, python and PyFlink environment installed is required to start a Python Flink session cluster. It could be the extension of the Flink official image as below: {code:java} FROM flink # install miniconda to prepare a python environment ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 ENV PATH /opt/conda/bin:$PATH RUN apt-get update --fix-missing && \ apt-get install -y wget bzip2 ca-certificates curl git && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-4.5.11-Linux-x86_64.sh -O ~/miniconda.sh && \ /bin/bash ~/miniconda.sh -b -p /opt/conda && \ rm ~/miniconda.sh && \ /opt/conda/bin/conda clean -tipsy && \ ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh && \ echo ". /opt/conda/etc/profile.d/conda.sh" >> ~/.bashrc && \ echo "conda activate base" >> ~/.bashrc # install python flink RUN pip install apache-flink{code} Assuming that the built image is tagged with name and version `pyflink:latest`, users can start a PyFlink session cluster by executing the following command: {code:java} ./bin/kubernetes-session.sh \ -Dkubernetes.container.image=pyflink:latest \ -Dkubernetes.cluster-id=my-flink-session \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=3600000{code} As the session cluster starts up, users are able to submit a Python Flink Job: {code:java} ./bin/flink run -m jm-host:port -py <PATH-TO-YOUR-PYTHON-CODES-DIRECTORY>/my_python_flink_job.py{code} 2. Native application mode Application mode requires users to create a single image containing their Job and the Flink runtime. Base on the built image with python and Python Flink environment provided in session mode section, users can build an flink application image like: {code:java} FROM pyflink:latest COPY /path/of/python/codes /opt/my_python_codes # if there are third party python dependencies, users can install them when building the image COPY /path/to/requirements.txt /opt/requirements.txt RUN pip install -r requirements.txt # if the job requires external java dependencies, they should be built into the image as well RUN mkdir -p $FLINK_HOME/usrlib COPY /path/of/external/jar/dependencies $FLINK_HOME/usrlib/{code} With the application image prepared and tagged as `pyflink:app`, users can submit a Python Flink application as below: {code:java} ./bin/flink run-application -p 8 -t kubernetes-application \ -Dkubernetes.cluster-id=my-flink-app \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=pyflink:app \ -Dpython.files=/opt/my_python_codes \ -c org.apache.flink.client.python.PythonDriver \ local:///opt/flink/usrlib/flink-python_2.11-1.12-SNAPSHOT.jar \ -pym <ENTRY_MODULE_NAME> (or -py /opt/my_python_codes/xxx.py) {code} Note that users must specify the following arguments: * `-Dpython.files=/opt/my_python_codes `: the path of the python codes in the image * `-c org.apache.flink.client.python.PythonDriver`: the java entry main class to execute the python codes, it is a constant but must be specified. * `local:///opt/flink/usrlib/flink-python_2.11-1.12-SNAPSHOT.jar`: the jar of python flink module, it's provided by Flink internal but must be specified. Above is the overall illustration of how to run PyFlink jobs on kubernetes in native mode. And it seems there are some chances to improve the ease of use, maybe we can do that in the future. Best, Shuiqiang > Support running PyFlink on Kubernetes > ------------------------------------- > > Key: FLINK-17480 > URL: https://issues.apache.org/jira/browse/FLINK-17480 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes > Reporter: Canbin Zheng > Priority: Major > > This is the umbrella issue for running PyFlink on Kubernetes in native mode. -- This message was sent by Atlassian Jira (v8.3.4#803005)