This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 41e357a [FLINK-15675][python][docs] Add exception and documentation that Python UDF is not supported in old Planner under batch mode 41e357a is described below commit 41e357afc032ce89ae1bd1aed6e54de463c0c987 Author: hequn8128 <chenghe...@gmail.com> AuthorDate: Mon Jan 20 10:57:43 2020 +0800 [FLINK-15675][python][docs] Add exception and documentation that Python UDF is not supported in old Planner under batch mode This closes #10907 --- docs/dev/table/functions/udfs.md | 2 ++ docs/dev/table/functions/udfs.zh.md | 2 ++ flink-python/pyflink/table/table_environment.py | 2 ++ flink-python/pyflink/table/tests/test_udf.py | 14 +++++++++++++- 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/docs/dev/table/functions/udfs.md b/docs/dev/table/functions/udfs.md index 54d2a01..523cd61 100644 --- a/docs/dev/table/functions/udfs.md +++ b/docs/dev/table/functions/udfs.md @@ -144,6 +144,8 @@ $ python --version $ python -m pip install apache-beam==2.15.0 {% endhighlight %} +<span class="label label-info">Note</span> Currently, Python UDF is supported in Blink planner both under streaming and batch mode while is only supported under streaming mode in old planner. + It supports to use both Java/Scala scalar functions and Python scalar functions in Python Table API and SQL. In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method. An evaluation method must be named `eval`. Evaluation method can also support variable arguments, such as `eval(*args)`. The following example shows how to define your own Java and Python hash code functions, register them in the TableEnvironment, and call them in a query. Note that you can configure your scalar function via a constructor before it is registered: diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md index 1022af8..c7cd78e 100644 --- a/docs/dev/table/functions/udfs.zh.md +++ b/docs/dev/table/functions/udfs.zh.md @@ -144,6 +144,8 @@ $ python --version $ python -m pip install apache-beam==2.15.0 {% endhighlight %} +<span class="label label-info">Note</span> Currently, Python UDF is supported in Blink planner both under streaming and batch mode while is only supported under streaming mode in old planner. + It supports to use both Java/Scala scalar functions and Python scalar functions in Python Table API and SQL. In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method. An evaluation method must be named `eval`. Evaluation method can also support variable arguments, such as `eval(*args)`. The following example shows how to define your own Java and Python hash code functions, register them in the TableEnvironment, and call them in a query. Note that you can configure your scalar function via a constructor before it is registered: diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index cdb29d5..4122ecd 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -743,6 +743,8 @@ class TableEnvironment(object): :param function: The python user-defined function to register. :type function: pyflink.table.udf.UserDefinedFunctionWrapper """ + if not self._is_blink_planner and isinstance(self, BatchTableEnvironment): + raise Exception("Python UDF is not supported in old planner under batch mode!") self._j_tenv.registerFunction(name, function._judf(self._is_blink_planner, self.get_config()._j_table_config)) diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py index 795a826..77983f0 100644 --- a/flink-python/pyflink/table/tests/test_udf.py +++ b/flink-python/pyflink/table/tests/test_udf.py @@ -19,7 +19,8 @@ from pyflink.table import DataTypes from pyflink.table.udf import ScalarFunction, udf from pyflink.testing import source_sink_utils from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \ - PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase + PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase, \ + PyFlinkBatchTableTestCase class UserDefinedFunctionTests(object): @@ -478,6 +479,17 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests, pass +class PyFlinkBatchUserDefinedFunctionTests(PyFlinkBatchTableTestCase): + + def test_invalid_register_udf(self): + self.assertRaises( + Exception, + lambda: self.t_env.register_function( + "add_one", + udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) + ) + + class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests, PyFlinkBlinkStreamTableTestCase): def test_deterministic(self):