This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 70bfb61  [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests
70bfb61 is described below

commit 70bfb61a48b1d1f5afce0ebc0f58a786e8013e29
Author: Wei Zhong <weizhong0...@gmail.com>
AuthorDate: Tue Jun 9 21:02:46 2020 +0800

    [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests
    
    This closes #12554.
---
 .../flink-python-test/python/python_job.py         | 13 +++--
 .../org/apache/flink/python/tests/util/AddOne.java | 29 +++++++++++
 .../test-scripts/test_pyflink.sh                   | 58 +++++++++++++++++++++-
 flink-python/dev/lint-python.sh                    |  4 +-
 4 files changed, 97 insertions(+), 7 deletions(-)

diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py 
b/flink-end-to-end-tests/flink-python-test/python/python_job.py
index 8df2d12..a85e633 100644
--- a/flink-end-to-end-tests/flink-python-test/python/python_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py
@@ -38,6 +38,11 @@ def word_count():
     env = ExecutionEnvironment.get_execution_environment()
     t_env = BatchTableEnvironment.create(env, t_config)
 
+    # used to test pipeline.jars and pipleline.classpaths
+    config_key = sys.argv[1]
+    config_value = sys.argv[2]
+    t_env.get_config().get_configuration().set_string(config_key, config_value)
+
     # register Results table in table environment
     tmp_dir = tempfile.gettempdir()
     result_path = tmp_dir + '/result'
@@ -55,7 +60,8 @@ def word_count():
     sink_ddl = """
         create table Results(
             word VARCHAR,
-            `count` BIGINT
+            `count` BIGINT,
+            `count_java` BIGINT
         ) with (
             'connector.type' = 'filesystem',
             'format.type' = 'csv',
@@ -65,12 +71,13 @@ def word_count():
     t_env.sql_update(sink_ddl)
 
     t_env.sql_update("create temporary system function add_one as 
'add_one.add_one' language python")
+    t_env.register_java_function("add_one_java", 
"org.apache.flink.python.tests.util.AddOne")
 
     elements = [(word, 0) for word in content.split(" ")]
     t_env.from_elements(elements, ["word", "count"]) \
-        .select("word, add_one(count) as count") \
+        .select("word, add_one(count) as count, add_one_java(count) as 
count_java") \
         .group_by("word") \
-        .select("word, count(count) as count") \
+        .select("word, count(count) as count, count(count_java) as 
count_java") \
         .insert_into("Results")
 
     t_env.execute("word_count")
diff --git 
a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java
 
b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java
new file mode 100644
index 0000000..f0b0cc0
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.tests.util;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Scala UDF for testing.
+ */
+public class AddOne extends ScalarFunction {
+       public long eval(long input) {
+               return input + 1;
+       }
+}
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh 
b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index ac521f1..1e58921 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -20,6 +20,7 @@
 set -Eeuo pipefail
 CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
 source "${CURRENT_DIR}"/common.sh
+source "${CURRENT_DIR}"/common_yarn_docker.sh
 
 cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
 
@@ -65,13 +66,23 @@ REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"
 
 echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}"
 
-echo "Test submitting python job:\n"
+echo "Test submitting python job with 'pipeline.jars':\n"
 PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
     -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
     -pyreq "${REQUIREMENTS_PATH}" \
     -pyarch "${TEST_DATA_DIR}/venv.zip" \
     -pyexec "venv.zip/.conda/bin/python" \
-    -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py"
+    -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \
+    pipeline.jars 
"file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
+
+echo "Test submitting python job with 'pipeline.classpaths':\n"
+PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
+    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
+    -pyreq "${REQUIREMENTS_PATH}" \
+    -pyarch "${TEST_DATA_DIR}/venv.zip" \
+    -pyexec "venv.zip/.conda/bin/python" \
+    -py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \
+    pipeline.classpaths 
"file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
 
 echo "Test blink stream python udf sql job:\n"
 PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
@@ -150,3 +161,46 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
 wait_job_terminal_state "$JOB_ID" "FINISHED"
 
 stop_cluster
+
+# test submitting on yarn
+start_hadoop_cluster_and_prepare_flink
+
+# copy test files
+docker cp "${FLINK_PYTHON_DIR}/dev/lint-python.sh" master:/tmp/
+docker cp "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" 
master:/tmp/
+docker cp "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" master:/tmp/
+docker cp "${REQUIREMENTS_PATH}" master:/tmp/
+docker cp "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" master:/tmp/
+PYFLINK_PACKAGE_FILE=$(basename 
"${FLINK_PYTHON_DIR}"/dist/apache-flink-*.tar.gz)
+docker cp "${FLINK_PYTHON_DIR}/dist/${PYFLINK_PACKAGE_FILE}" master:/tmp/
+
+# prepare environment
+docker exec master bash -c "
+/tmp/lint-python.sh -s miniconda
+source /tmp/.conda/bin/activate
+pip install /tmp/${PYFLINK_PACKAGE_FILE}
+conda install -y -q zip=3.0
+rm -rf /tmp/.conda/pkgs
+cd /tmp
+zip -q -r /tmp/venv.zip .conda
+echo \"taskmanager.memory.task.off-heap.size: 100m\" >> 
\"/home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml\"
+"
+
+docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
+    export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
+    /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 
-yjm 1000 \
+    -pyfs /tmp/add_one.py \
+    -pyreq /tmp/requirements.txt \
+    -pyarch /tmp/venv.zip \
+    -pyexec venv.zip/.conda/bin/python \
+    /tmp/PythonUdfSqlJobExample.jar"
+
+docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
+    export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
+    /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 
-yjm 1000 \
+    -pyfs /tmp/add_one.py \
+    -pyreq /tmp/requirements.txt \
+    -pyarch /tmp/venv.zip \
+    -pyexec venv.zip/.conda/bin/python \
+    -py /tmp/python_job.py \
+    pipeline.jars file:/tmp/PythonUdfSqlJobExample.jar"
diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh
index 1ad2a19..cb65ff8 100755
--- a/flink-python/dev/lint-python.sh
+++ b/flink-python/dev/lint-python.sh
@@ -28,10 +28,10 @@ function download() {
     local DOWNLOAD_STATUS=
     if hash "wget" 2>/dev/null; then
         # because of the difference of all versions of wget, so we turn of the 
option --show-progress
-        wget "$1" -O "$2" -q
+        wget "$1" -O "$2" -q -T20 -t3
         DOWNLOAD_STATUS="$?"
     else
-        curl "$1" -o "$2" --progress-bar
+        curl "$1" -o "$2" --progress-bar --connect-timeout 20 --retry 3
         DOWNLOAD_STATUS="$?"
     fi
     if [ $DOWNLOAD_STATUS -ne 0 ]; then

Reply via email to