[ 
https://issues.apache.org/jira/browse/FLINK-27776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-27776:
-----------------------------------
    Summary: Throw exception when UDAF used in sliding window does not 
implement merge method in PyFlink  (was: Throws exception when udaf used in 
sliding window does not implement merge method in PyFlink)

> Throw exception when UDAF used in sliding window does not implement merge 
> method in PyFlink
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27776
>                 URL: https://issues.apache.org/jira/browse/FLINK-27776
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>    Affects Versions: 1.15.0, 1.13.6, 1.14.4
>            Reporter: Huang Xingbo
>            Assignee: Huang Xingbo
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.14.5, 1.15.1
>
>
> We use the pane state to optimize the result of calculating the window state, 
> which requires udaf to implement the merge method. However, due to the lack 
> of detection of whether the merge method of udaf is implemented, the user's 
> output result did not meet his expectations and there is no exception. Below 
> is an example of a UDAF that implements the merge method:
> {code:python}
> class SumAggregateFunction(AggregateFunction):
>     def get_value(self, accumulator):
>         return accumulator[0]
>     def create_accumulator(self):
>         return [0]
>     def accumulate(self, accumulator, *args):
>         accumulator[0] = accumulator[0] + args[0]
>     def retract(self, accumulator, *args):
>         accumulator[0] = accumulator[0] - args[0]
>     def merge(self, accumulator, accumulators):
>         for other_acc in accumulators:
>             accumulator[0] = accumulator[0] + other_acc[0]
>     def get_accumulator_type(self):
>         return DataTypes.ARRAY(DataTypes.BIGINT())
>     def get_result_type(self):
>         return DataTypes.BIGINT()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to