This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 41e357a  [FLINK-15675][python][docs] Add exception and documentation 
that Python UDF is not supported in old Planner under batch mode
41e357a is described below

commit 41e357afc032ce89ae1bd1aed6e54de463c0c987
Author: hequn8128 <chenghe...@gmail.com>
AuthorDate: Mon Jan 20 10:57:43 2020 +0800

    [FLINK-15675][python][docs] Add exception and documentation that Python UDF 
is not supported in old Planner under batch mode
    
    This closes #10907
---
 docs/dev/table/functions/udfs.md                |  2 ++
 docs/dev/table/functions/udfs.zh.md             |  2 ++
 flink-python/pyflink/table/table_environment.py |  2 ++
 flink-python/pyflink/table/tests/test_udf.py    | 14 +++++++++++++-
 4 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/docs/dev/table/functions/udfs.md b/docs/dev/table/functions/udfs.md
index 54d2a01..523cd61 100644
--- a/docs/dev/table/functions/udfs.md
+++ b/docs/dev/table/functions/udfs.md
@@ -144,6 +144,8 @@ $ python --version
 $ python -m pip install apache-beam==2.15.0
 {% endhighlight %}
 
+<span class="label label-info">Note</span> Currently, Python UDF is supported 
in Blink planner both under streaming and batch mode while is only supported 
under streaming mode in old planner.
+
 It supports to use both Java/Scala scalar functions and Python scalar 
functions in Python Table API and SQL. In order to define a Python scalar 
function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` 
and implement an evaluation method. The behavior of a Python scalar function is 
determined by the evaluation method. An evaluation method must be named `eval`. 
Evaluation method can also support variable arguments, such as `eval(*args)`.
 
 The following example shows how to define your own Java and Python hash code 
functions, register them in the TableEnvironment, and call them in a query. 
Note that you can configure your scalar function via a constructor before it is 
registered:
diff --git a/docs/dev/table/functions/udfs.zh.md 
b/docs/dev/table/functions/udfs.zh.md
index 1022af8..c7cd78e 100644
--- a/docs/dev/table/functions/udfs.zh.md
+++ b/docs/dev/table/functions/udfs.zh.md
@@ -144,6 +144,8 @@ $ python --version
 $ python -m pip install apache-beam==2.15.0
 {% endhighlight %}
 
+<span class="label label-info">Note</span> Currently, Python UDF is supported 
in Blink planner both under streaming and batch mode while is only supported 
under streaming mode in old planner.
+
 It supports to use both Java/Scala scalar functions and Python scalar 
functions in Python Table API and SQL. In order to define a Python scalar 
function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` 
and implement an evaluation method. The behavior of a Python scalar function is 
determined by the evaluation method. An evaluation method must be named `eval`. 
Evaluation method can also support variable arguments, such as `eval(*args)`.
 
 The following example shows how to define your own Java and Python hash code 
functions, register them in the TableEnvironment, and call them in a query. 
Note that you can configure your scalar function via a constructor before it is 
registered:
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index cdb29d5..4122ecd 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -743,6 +743,8 @@ class TableEnvironment(object):
         :param function: The python user-defined function to register.
         :type function: pyflink.table.udf.UserDefinedFunctionWrapper
         """
+        if not self._is_blink_planner and isinstance(self, 
BatchTableEnvironment):
+            raise Exception("Python UDF is not supported in old planner under 
batch mode!")
         self._j_tenv.registerFunction(name, 
function._judf(self._is_blink_planner,
                                                            
self.get_config()._j_table_config))
 
diff --git a/flink-python/pyflink/table/tests/test_udf.py 
b/flink-python/pyflink/table/tests/test_udf.py
index 795a826..77983f0 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -19,7 +19,8 @@ from pyflink.table import DataTypes
 from pyflink.table.udf import ScalarFunction, udf
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase
+    PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBatchTableTestCase
 
 
 class UserDefinedFunctionTests(object):
@@ -478,6 +479,17 @@ class 
PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
     pass
 
 
+class PyFlinkBatchUserDefinedFunctionTests(PyFlinkBatchTableTestCase):
+
+    def test_invalid_register_udf(self):
+        self.assertRaises(
+            Exception,
+            lambda: self.t_env.register_function(
+                "add_one",
+                udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))
+        )
+
+
 class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
                                                  
PyFlinkBlinkStreamTableTestCase):
     def test_deterministic(self):

Reply via email to