Huang Xingbo created FLINK-20507:
------------------------------------

             Summary: Support Aggregate Operation in Python Table API
                 Key: FLINK-20507
                 URL: https://issues.apache.org/jira/browse/FLINK-20507
             Project: Flink
          Issue Type: Sub-task
          Components: API / Python
            Reporter: Huang Xingbo
             Fix For: 1.13.0


Support Python UDAF for Aggregate Operation in Python Table API

The usage:
{code:java}
t = ...  # type: Table, table schema: [a: String, b: Int, c: Int]

# aggregate General Python UDAF
t_env.create_temporary_function("agg", GeneralPythonAggregateFunction())
t.group_by(t.c).select("agg(a)")

# aggregate Pandas UDAF
mean_max_udaf = udaf(lambda a: Row(a.mean(), a.max()),
                     result_type=DataTypes.ROW(
                         [DataTypes.FIELD("a", DataTypes.FLOAT()),
                          DataTypes.FIELD("b", DataTypes.INT()),
                     func_type="pandas")
t.group_by(t.a).aggregate(mean_max_udaf(t.b).alias("d", "f")).select("a, d, 
f"){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to