This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 9c5ca059080 [FLINK-31707][python] Fix Pandas UDAF to support accepting 
constant string as inputs
9c5ca059080 is described below

commit 9c5ca0590806932e4e8f9d3f942f0a2a5442fe2d
Author: Dian Fu <dia...@apache.org>
AuthorDate: Mon Apr 3 19:41:29 2023 +0800

    [FLINK-31707][python] Fix Pandas UDAF to support accepting constant string 
as inputs
    
    This closes #22332.
---
 flink-python/pyflink/fn_execution/coders.py          |  5 +++--
 flink-python/pyflink/table/tests/test_pandas_udaf.py | 15 ++++++++++++---
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index 0f4a822ca6c..9b86e98a031 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -41,8 +41,7 @@ from pyflink.common.typeinfo import TypeInformation, 
BasicTypeInfo, BasicType, D
 from pyflink.table.types import TinyIntType, SmallIntType, IntType, 
BigIntType, BooleanType, \
     FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, 
TimeType, \
     LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, 
ArrayType, MapType, \
-    BinaryType, NullType
-
+    BinaryType, NullType, CharType
 
 __all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 
'BooleanCoder',
            'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 
'BinaryCoder', 'CharCoder',
@@ -131,6 +130,8 @@ class LengthPrefixBaseCoder(ABC):
             return FloatType(field_type.nullable)
         elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
             return DoubleType(field_type.nullable)
+        elif field_type.type_name == flink_fn_execution_pb2.Schema.CHAR:
+            return CharType(field_type.char_info.length, field_type.nullable)
         elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
             return VarCharType(field_type.var_char_info.length, 
field_type.nullable)
         elif field_type.type_name == flink_fn_execution_pb2.Schema.BINARY:
diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py 
b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index 1791add00a4..650efcd966c 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -61,7 +61,9 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
         sink_table_ddl = f"""
             CREATE TABLE {sink_table}(
                 a TINYINT,
-                b FLOAT,c ROW<a INT, b INT>
+                b FLOAT,
+                c ROW<a INT, b INT>,
+                d STRING
             ) WITH ('connector'='test-sink')
         """
         self.t_env.execute_sql(sink_table_ddl)
@@ -74,14 +76,21 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                             [DataTypes.FIELD("a", DataTypes.INT()),
                              DataTypes.FIELD("b", DataTypes.INT())]),
                         func_type="pandas")
+
+        @udaf(result_type=DataTypes.STRING(), func_type="pandas")
+        def multiply_udaf(a, b):
+            return len(a) * b[0]
+
         t.group_by(t.a) \
-            .select(t.a, mean_udaf(add(t.b)), max_udaf(substract(t.c))) \
+            .select(t.a, mean_udaf(add(t.b)), max_udaf(substract(t.c)), 
multiply_udaf(t.b, 'abc')) \
             .execute_insert(sink_table) \
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(
             actual,
-            ["+I[1, 6.0, +I[5, 2]]", "+I[2, 3.0, +I[3, 2]]", "+I[3, 3.0, +I[2, 
2]]"])
+            ["+I[1, 6.0, +I[5, 2], abcabcabc]",
+             "+I[2, 3.0, +I[3, 2], abcabc]",
+             "+I[3, 3.0, +I[2, 2], abc]"])
 
     def test_group_aggregate_without_keys(self):
         t = self.t_env.from_elements(

Reply via email to