This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 70bfb61 [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests 70bfb61 is described below commit 70bfb61a48b1d1f5afce0ebc0f58a786e8013e29 Author: Wei Zhong <weizhong0...@gmail.com> AuthorDate: Tue Jun 9 21:02:46 2020 +0800 [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests This closes #12554. --- .../flink-python-test/python/python_job.py | 13 +++-- .../org/apache/flink/python/tests/util/AddOne.java | 29 +++++++++++ .../test-scripts/test_pyflink.sh | 58 +++++++++++++++++++++- flink-python/dev/lint-python.sh | 4 +- 4 files changed, 97 insertions(+), 7 deletions(-) diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py b/flink-end-to-end-tests/flink-python-test/python/python_job.py index 8df2d12..a85e633 100644 --- a/flink-end-to-end-tests/flink-python-test/python/python_job.py +++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py @@ -38,6 +38,11 @@ def word_count(): env = ExecutionEnvironment.get_execution_environment() t_env = BatchTableEnvironment.create(env, t_config) + # used to test pipeline.jars and pipleline.classpaths + config_key = sys.argv[1] + config_value = sys.argv[2] + t_env.get_config().get_configuration().set_string(config_key, config_value) + # register Results table in table environment tmp_dir = tempfile.gettempdir() result_path = tmp_dir + '/result' @@ -55,7 +60,8 @@ def word_count(): sink_ddl = """ create table Results( word VARCHAR, - `count` BIGINT + `count` BIGINT, + `count_java` BIGINT ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', @@ -65,12 +71,13 @@ def word_count(): t_env.sql_update(sink_ddl) t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python") + t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne") elements = [(word, 0) for word in content.split(" ")] t_env.from_elements(elements, ["word", "count"]) \ - .select("word, add_one(count) as count") \ + .select("word, add_one(count) as count, add_one_java(count) as count_java") \ .group_by("word") \ - .select("word, count(count) as count") \ + .select("word, count(count) as count, count(count_java) as count_java") \ .insert_into("Results") t_env.execute("word_count") diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java new file mode 100644 index 0000000..f0b0cc0 --- /dev/null +++ b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python.tests.util; + +import org.apache.flink.table.functions.ScalarFunction; + +/** + * Scala UDF for testing. + */ +public class AddOne extends ScalarFunction { + public long eval(long input) { + return input + 1; + } +} diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh index ac521f1..1e58921 100755 --- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh +++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh @@ -20,6 +20,7 @@ set -Eeuo pipefail CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P` source "${CURRENT_DIR}"/common.sh +source "${CURRENT_DIR}"/common_yarn_docker.sh cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf" @@ -65,13 +66,23 @@ REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt" echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}" -echo "Test submitting python job:\n" +echo "Test submitting python job with 'pipeline.jars':\n" PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \ -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \ -pyreq "${REQUIREMENTS_PATH}" \ -pyarch "${TEST_DATA_DIR}/venv.zip" \ -pyexec "venv.zip/.conda/bin/python" \ - -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" + -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \ + pipeline.jars "file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" + +echo "Test submitting python job with 'pipeline.classpaths':\n" +PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \ + -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \ + -pyreq "${REQUIREMENTS_PATH}" \ + -pyarch "${TEST_DATA_DIR}/venv.zip" \ + -pyexec "venv.zip/.conda/bin/python" \ + -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \ + pipeline.classpaths "file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" echo "Test blink stream python udf sql job:\n" PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \ @@ -150,3 +161,46 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \ wait_job_terminal_state "$JOB_ID" "FINISHED" stop_cluster + +# test submitting on yarn +start_hadoop_cluster_and_prepare_flink + +# copy test files +docker cp "${FLINK_PYTHON_DIR}/dev/lint-python.sh" master:/tmp/ +docker cp "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" master:/tmp/ +docker cp "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" master:/tmp/ +docker cp "${REQUIREMENTS_PATH}" master:/tmp/ +docker cp "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" master:/tmp/ +PYFLINK_PACKAGE_FILE=$(basename "${FLINK_PYTHON_DIR}"/dist/apache-flink-*.tar.gz) +docker cp "${FLINK_PYTHON_DIR}/dist/${PYFLINK_PACKAGE_FILE}" master:/tmp/ + +# prepare environment +docker exec master bash -c " +/tmp/lint-python.sh -s miniconda +source /tmp/.conda/bin/activate +pip install /tmp/${PYFLINK_PACKAGE_FILE} +conda install -y -q zip=3.0 +rm -rf /tmp/.conda/pkgs +cd /tmp +zip -q -r /tmp/venv.zip .conda +echo \"taskmanager.memory.task.off-heap.size: 100m\" >> \"/home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml\" +" + +docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \ + export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \ + /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \ + -pyfs /tmp/add_one.py \ + -pyreq /tmp/requirements.txt \ + -pyarch /tmp/venv.zip \ + -pyexec venv.zip/.conda/bin/python \ + /tmp/PythonUdfSqlJobExample.jar" + +docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \ + export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \ + /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \ + -pyfs /tmp/add_one.py \ + -pyreq /tmp/requirements.txt \ + -pyarch /tmp/venv.zip \ + -pyexec venv.zip/.conda/bin/python \ + -py /tmp/python_job.py \ + pipeline.jars file:/tmp/PythonUdfSqlJobExample.jar" diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh index 1ad2a19..cb65ff8 100755 --- a/flink-python/dev/lint-python.sh +++ b/flink-python/dev/lint-python.sh @@ -28,10 +28,10 @@ function download() { local DOWNLOAD_STATUS= if hash "wget" 2>/dev/null; then # because of the difference of all versions of wget, so we turn of the option --show-progress - wget "$1" -O "$2" -q + wget "$1" -O "$2" -q -T20 -t3 DOWNLOAD_STATUS="$?" else - curl "$1" -o "$2" --progress-bar + curl "$1" -o "$2" --progress-bar --connect-timeout 20 --retry 3 DOWNLOAD_STATUS="$?" fi if [ $DOWNLOAD_STATUS -ne 0 ]; then