This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 9c5ca059080 [FLINK-31707][python] Fix Pandas UDAF to support accepting constant string as inputs 9c5ca059080 is described below commit 9c5ca0590806932e4e8f9d3f942f0a2a5442fe2d Author: Dian Fu <dia...@apache.org> AuthorDate: Mon Apr 3 19:41:29 2023 +0800 [FLINK-31707][python] Fix Pandas UDAF to support accepting constant string as inputs This closes #22332. --- flink-python/pyflink/fn_execution/coders.py | 5 +++-- flink-python/pyflink/table/tests/test_pandas_udaf.py | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index 0f4a822ca6c..9b86e98a031 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -41,8 +41,7 @@ from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, D from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \ FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \ LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType, MapType, \ - BinaryType, NullType - + BinaryType, NullType, CharType __all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder', 'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 'BinaryCoder', 'CharCoder', @@ -131,6 +130,8 @@ class LengthPrefixBaseCoder(ABC): return FloatType(field_type.nullable) elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE: return DoubleType(field_type.nullable) + elif field_type.type_name == flink_fn_execution_pb2.Schema.CHAR: + return CharType(field_type.char_info.length, field_type.nullable) elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR: return VarCharType(field_type.var_char_info.length, field_type.nullable) elif field_type.type_name == flink_fn_execution_pb2.Schema.BINARY: diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py index 1791add00a4..650efcd966c 100644 --- a/flink-python/pyflink/table/tests/test_pandas_udaf.py +++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py @@ -61,7 +61,9 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase): sink_table_ddl = f""" CREATE TABLE {sink_table}( a TINYINT, - b FLOAT,c ROW<a INT, b INT> + b FLOAT, + c ROW<a INT, b INT>, + d STRING ) WITH ('connector'='test-sink') """ self.t_env.execute_sql(sink_table_ddl) @@ -74,14 +76,21 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase): [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") + + @udaf(result_type=DataTypes.STRING(), func_type="pandas") + def multiply_udaf(a, b): + return len(a) * b[0] + t.group_by(t.a) \ - .select(t.a, mean_udaf(add(t.b)), max_udaf(substract(t.c))) \ + .select(t.a, mean_udaf(add(t.b)), max_udaf(substract(t.c)), multiply_udaf(t.b, 'abc')) \ .execute_insert(sink_table) \ .wait() actual = source_sink_utils.results() self.assert_equals( actual, - ["+I[1, 6.0, +I[5, 2]]", "+I[2, 3.0, +I[3, 2]]", "+I[3, 3.0, +I[2, 2]]"]) + ["+I[1, 6.0, +I[5, 2], abcabcabc]", + "+I[2, 3.0, +I[3, 2], abcabc]", + "+I[3, 3.0, +I[2, 2], abc]"]) def test_group_aggregate_without_keys(self): t = self.t_env.from_elements(