dianfu commented on a change in pull request #14389: URL: https://github.com/apache/flink/pull/14389#discussion_r544054227
########## File path: flink-python/pyflink/table/udf.py ########## @@ -647,3 +692,72 @@ def udaf(f: Union[Callable, UserDefinedFunction, Type] = None, else: return _create_udaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name) + + +def udtaf(f: Union[Callable, UserDefinedFunction, Type] = None, + input_types: Union[List[DataType], DataType] = None, result_type: DataType = None, + accumulator_type: DataType = None, deterministic: bool = None, name: str = None, + func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]: + """ + Helper method for creating a user-defined table aggregate function. + + Example: + :: + + >>> # The input_types is optional. + >>> class Top2(TableAggregateFunction): + ... def emit_value(self, accumulator): + ... yield Row(accumulator[0]) + ... yield Row(accumulator[1]) + ... + ... def create_accumulator(self): + ... return [None, None] + ... + ... def accumulate(self, accumulator, *args): + ... if args[0] is not None: + ... if accumulator[0] is None or args[0] > accumulator[0]: + ... accumulator[1] = accumulator[0] + ... accumulator[0] = args[0] + ... elif accumulator[1] is None or args[0] > accumulator[1]: + ... accumulator[1] = args[0] + ... + ... def retract(self, accumulator, *args): + ... accumulator[0] = accumulator[0] - 1 + ... + ... def merge(self, accumulator, accumulators): + ... for other_acc in accumulators: + ... self.accumulate(accumulator, other_acc[0]) + ... self.accumulate(accumulator, other_acc[1]) + ... + ... def get_accumulator_type(self): + ... return DataTypes.ARRAY(DataTypes.BIGINT()) + ... + ... def get_result_type(self): + ... return DataTypes.ROW( + ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) + >>> top2 = udtaf(Top2()) + + :param f: user-defined table aggregate function. + :param input_types: optional, the input data types. + :param result_type: the result data type. + :param accumulator_type: optional, the accumulator data type. + :param deterministic: the determinism of the function's results. True if and only if a call to + this function is guaranteed to always return the same result given the + same parameters. (default True) + :param name: the function name. + :param func_type: the type of the python function, available value: general + (default: general) + :return: UserDefinedAggregateFunctionWrapper or function. + + .. versionadded:: 1.13.0 + """ + if func_type != 'general': + raise ValueError("The func_type must be one of 'general', got %s." Review comment: ```suggestion raise ValueError("The func_type must be 'general', got %s." ``` ########## File path: flink-python/pyflink/table/udf.py ########## @@ -647,3 +692,72 @@ def udaf(f: Union[Callable, UserDefinedFunction, Type] = None, else: return _create_udaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name) + + +def udtaf(f: Union[Callable, UserDefinedFunction, Type] = None, Review comment: ```suggestion def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None, ``` ########## File path: flink-python/pyflink/table/udf.py ########## @@ -647,3 +692,72 @@ def udaf(f: Union[Callable, UserDefinedFunction, Type] = None, else: return _create_udaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name) + + +def udtaf(f: Union[Callable, UserDefinedFunction, Type] = None, + input_types: Union[List[DataType], DataType] = None, result_type: DataType = None, + accumulator_type: DataType = None, deterministic: bool = None, name: str = None, + func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]: + """ + Helper method for creating a user-defined table aggregate function. + + Example: + :: + + >>> # The input_types is optional. + >>> class Top2(TableAggregateFunction): + ... def emit_value(self, accumulator): + ... yield Row(accumulator[0]) + ... yield Row(accumulator[1]) + ... + ... def create_accumulator(self): + ... return [None, None] + ... + ... def accumulate(self, accumulator, *args): + ... if args[0] is not None: + ... if accumulator[0] is None or args[0] > accumulator[0]: + ... accumulator[1] = accumulator[0] + ... accumulator[0] = args[0] + ... elif accumulator[1] is None or args[0] > accumulator[1]: + ... accumulator[1] = args[0] + ... + ... def retract(self, accumulator, *args): + ... accumulator[0] = accumulator[0] - 1 + ... + ... def merge(self, accumulator, accumulators): + ... for other_acc in accumulators: + ... self.accumulate(accumulator, other_acc[0]) + ... self.accumulate(accumulator, other_acc[1]) + ... + ... def get_accumulator_type(self): + ... return DataTypes.ARRAY(DataTypes.BIGINT()) + ... + ... def get_result_type(self): + ... return DataTypes.ROW( + ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) + >>> top2 = udtaf(Top2()) + + :param f: user-defined table aggregate function. + :param input_types: optional, the input data types. + :param result_type: the result data type. + :param accumulator_type: optional, the accumulator data type. + :param deterministic: the determinism of the function's results. True if and only if a call to + this function is guaranteed to always return the same result given the + same parameters. (default True) + :param name: the function name. + :param func_type: the type of the python function, available value: general + (default: general) + :return: UserDefinedAggregateFunctionWrapper or function. + + .. versionadded:: 1.13.0 + """ + if func_type != 'general': + raise ValueError("The func_type must be one of 'general', got %s." Review comment: So it still doesn't support pandas? Could you create a ticket? ########## File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperator.java ########## @@ -39,30 +37,19 @@ @VisibleForTesting protected static final String STREAM_GROUP_TABLE_AGGREGATE_URN = "flink:transform:stream_group_table_aggregate:v1"; - private final PythonAggregateFunctionInfo aggregateFunction; - - private final DataViewUtils.DataViewSpec[] dataViewSpecs; - public PythonStreamGroupTableAggregateOperator( Configuration config, RowType inputType, RowType outputType, - PythonAggregateFunctionInfo aggregateFunction, Review comment: What's the purpose of this change? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org