This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bba028a  [FLINK-20858][python][table-planner-blink] Port 
StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to 
Java (#14665)
bba028a is described below

commit bba028a38f056b7c111944a7a32307c3e812857f
Author: HuangXingBo <hxbks...@gmail.com>
AuthorDate: Mon Jan 18 20:21:46 2021 +0800

    [FLINK-20858][python][table-planner-blink] Port 
StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to 
Java (#14665)
---
 .../batch/BatchExecPythonGroupWindowAggregate.java | 216 +++++++++++++
 .../StreamExecPythonGroupWindowAggregate.java      | 342 +++++++++++++++++++++
 .../BatchExecPythonGroupWindowAggregate.scala      | 179 -----------
 .../StreamExecPythonGroupWindowAggregate.scala     | 270 ----------------
 .../BatchPhysicalPythonGroupWindowAggregate.scala  |   2 +-
 5 files changed, 559 insertions(+), 450 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
new file mode 100644
index 0000000..299ed9c
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute;
+import org.apache.flink.table.planner.expressions.PlannerWindowEnd;
+import org.apache.flink.table.planner.expressions.PlannerWindowProperty;
+import org.apache.flink.table.planner.expressions.PlannerWindowStart;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+
+/** Batch [[ExecNode]] for group widow aggregate (Python user defined 
aggregate function). */
+public class BatchExecPythonGroupWindowAggregate extends 
CommonExecPythonAggregate
+        implements BatchExecNode<RowData> {
+
+    private static final String 
ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME =
+            
"org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch."
+                    + "BatchArrowPythonGroupWindowAggregateFunctionOperator";
+
+    private final int[] grouping;
+    private final int[] groupingSet;
+    private final AggregateCall[] aggCalls;
+    private final LogicalWindow window;
+    private final int inputTimeFieldIndex;
+    private final FlinkRelBuilder.PlannerNamedWindowProperty[] 
namedWindowProperties;
+
+    public BatchExecPythonGroupWindowAggregate(
+            int[] grouping,
+            int[] groupingSet,
+            AggregateCall[] aggCalls,
+            LogicalWindow window,
+            int inputTimeFieldIndex,
+            FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(inputEdge, outputType, description);
+        this.grouping = grouping;
+        this.groupingSet = groupingSet;
+        this.aggCalls = aggCalls;
+        this.window = window;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.namedWindowProperties = namedWindowProperties;
+    }
+
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) 
getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = 
inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+        final RowType outputRowType = 
InternalTypeInfo.of(getOutputType()).toRowType();
+
+        final Tuple2<Long, Long> windowSizeAndSlideSize = 
WindowCodeGenerator.getWindowDef(window);
+        final TableConfig tableConfig = planner.getTableConfig();
+        final Configuration config =
+                CommonPythonUtil.getMergedConfig(planner.getExecEnv(), 
tableConfig);
+        int groupBufferLimitSize =
+                
config.getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
+
+        OneInputTransformation<RowData, RowData> transform =
+                createPythonOneInputTransformation(
+                        inputTransform,
+                        inputRowType,
+                        outputRowType,
+                        groupBufferLimitSize,
+                        windowSizeAndSlideSize.f0,
+                        windowSizeAndSlideSize.f1,
+                        config);
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) {
+            
transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
+        }
+        return transform;
+    }
+
+    private OneInputTransformation<RowData, RowData> 
createPythonOneInputTransformation(
+            Transformation<RowData> inputTransform,
+            RowType inputRowType,
+            RowType outputRowType,
+            int maxLimitSize,
+            long windowSize,
+            long slideSize,
+            Configuration config) {
+        int[] namePropertyTypeArray =
+                Arrays.stream(namedWindowProperties)
+                        .mapToInt(
+                                p -> {
+                                    PlannerWindowProperty property = 
p.property();
+                                    if (property instanceof 
PlannerWindowStart) {
+                                        return 0;
+                                    }
+                                    if (property instanceof PlannerWindowEnd) {
+                                        return 1;
+                                    }
+                                    if (property instanceof 
PlannerRowtimeAttribute) {
+                                        return 2;
+                                    }
+                                    throw new TableException("Unexpected 
property " + property);
+                                })
+                        .toArray();
+        Tuple2<int[], PythonFunctionInfo[]> aggInfos =
+                extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls);
+        int[] pythonUdafInputOffsets = aggInfos.f0;
+        PythonFunctionInfo[] pythonFunctionInfos = aggInfos.f1;
+        OneInputStreamOperator<RowData, RowData> pythonOperator =
+                getPythonGroupWindowAggregateFunctionOperator(
+                        config,
+                        inputRowType,
+                        outputRowType,
+                        maxLimitSize,
+                        windowSize,
+                        slideSize,
+                        namePropertyTypeArray,
+                        pythonUdafInputOffsets,
+                        pythonFunctionInfos);
+        return new OneInputTransformation<>(
+                inputTransform,
+                getDesc(),
+                pythonOperator,
+                InternalTypeInfo.of(outputRowType),
+                inputTransform.getParallelism());
+    }
+
+    @SuppressWarnings("unchecked")
+    private OneInputStreamOperator<RowData, RowData> 
getPythonGroupWindowAggregateFunctionOperator(
+            Configuration config,
+            RowType inputRowType,
+            RowType outputRowType,
+            int maxLimitSize,
+            long windowSize,
+            long slideSize,
+            int[] namePropertyTypeArray,
+            int[] udafInputOffsets,
+            PythonFunctionInfo[] pythonFunctionInfos) {
+        Class<?> clazz =
+                CommonPythonUtil.loadClass(
+                        
ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+        try {
+            Constructor<?> ctor =
+                    clazz.getConstructor(
+                            Configuration.class,
+                            PythonFunctionInfo[].class,
+                            RowType.class,
+                            RowType.class,
+                            int.class,
+                            int.class,
+                            long.class,
+                            long.class,
+                            int[].class,
+                            int[].class,
+                            int[].class,
+                            int[].class);
+            return (OneInputStreamOperator<RowData, RowData>)
+                    ctor.newInstance(
+                            config,
+                            pythonFunctionInfos,
+                            inputRowType,
+                            outputRowType,
+                            inputTimeFieldIndex,
+                            maxLimitSize,
+                            windowSize,
+                            slideSize,
+                            namePropertyTypeArray,
+                            grouping,
+                            groupingSet,
+                            udafInputOffsets);
+        } catch (NoSuchMethodException
+                | InstantiationException
+                | IllegalAccessException
+                | InvocationTargetException e) {
+            throw new TableException(
+                    "Python 
BatchArrowPythonGroupWindowAggregateFunctionOperator constructed failed.",
+                    e);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
new file mode 100644
index 0000000..dae2eb2
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute;
+import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute;
+import org.apache.flink.table.planner.expressions.PlannerWindowEnd;
+import org.apache.flink.table.planner.expressions.PlannerWindowProperty;
+import org.apache.flink.table.planner.expressions.PlannerWindowStart;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.window.assigners.CountSlidingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.CountTumblingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers;
+import 
org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers;
+import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.hasRowIntervalType;
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType;
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.isProctimeAttribute;
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.isRowtimeAttribute;
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.timeFieldIndex;
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration;
+import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong;
+
+/** Stream [[ExecNode]] for group widow aggregate (Python user defined 
aggregate function). */
+public class StreamExecPythonGroupWindowAggregate extends 
CommonExecPythonAggregate
+        implements StreamExecNode<RowData> {
+    private static final Logger LOGGER =
+            
LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class);
+
+    private static final String 
ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME =
+            
"org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
+                    + "StreamArrowPythonGroupWindowAggregateFunctionOperator";
+
+    private final int[] grouping;
+    private final AggregateCall[] aggCalls;
+    private final LogicalWindow window;
+    private final FlinkRelBuilder.PlannerNamedWindowProperty[] 
namedWindowProperties;
+    private final WindowEmitStrategy emitStrategy;
+
+    public StreamExecPythonGroupWindowAggregate(
+            int[] grouping,
+            AggregateCall[] aggCalls,
+            LogicalWindow window,
+            FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties,
+            WindowEmitStrategy emitStrategy,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(inputEdge, outputType, description);
+        this.grouping = grouping;
+        this.aggCalls = aggCalls;
+        this.window = window;
+        this.namedWindowProperties = namedWindowProperties;
+        this.emitStrategy = emitStrategy;
+    }
+
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final boolean isCountWindow;
+        if (window instanceof TumblingGroupWindow) {
+            isCountWindow = hasRowIntervalType(((TumblingGroupWindow) 
window).size());
+        } else if (window instanceof SlidingGroupWindow) {
+            isCountWindow = hasRowIntervalType(((SlidingGroupWindow) 
window).size());
+        } else {
+            isCountWindow = false;
+        }
+
+        final TableConfig tableConfig = planner.getTableConfig();
+        if (isCountWindow
+                && grouping.length > 0
+                && tableConfig.getMinIdleStateRetentionTime() < 0) {
+            LOGGER.warn(
+                    "No state retention interval configured for a query which 
accumulates state."
+                            + " Please provide a query configuration with 
valid retention interval to"
+                            + " prevent excessive state size. You may specify 
a retention time of 0 to"
+                            + " not clean up the state.");
+        }
+
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) 
getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = 
inputNode.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+        final RowType outputRowType = 
InternalTypeInfo.of(getOutputType()).toRowType();
+
+        final int inputTimeFieldIndex;
+        if (isRowtimeAttribute(window.timeAttribute())) {
+            inputTimeFieldIndex =
+                    timeFieldIndex(
+                            
FlinkTypeFactory.INSTANCE().buildRelNodeRowType(inputRowType),
+                            planner.getRelBuilder(),
+                            window.timeAttribute());
+            if (inputTimeFieldIndex < 0) {
+                throw new TableException(
+                        "Group window must defined on a time attribute, "
+                                + "but the time attribute can't be found.\n"
+                                + "This should never happen. Please file an 
issue.");
+            }
+        } else {
+            inputTimeFieldIndex = -1;
+        }
+        Tuple2<WindowAssigner<?>, Trigger<?>> windowAssignerAndTrigger =
+                generateWindowAssignerAndTrigger();
+        WindowAssigner<?> windowAssigner = windowAssignerAndTrigger.f0;
+        Trigger<?> trigger = windowAssignerAndTrigger.f1;
+        Configuration config = 
CommonPythonUtil.getMergedConfig(planner.getExecEnv(), tableConfig);
+        OneInputTransformation<RowData, RowData> transform =
+                createPythonStreamWindowGroupOneInputTransformation(
+                        inputTransform,
+                        inputRowType,
+                        outputRowType,
+                        inputTimeFieldIndex,
+                        windowAssigner,
+                        trigger,
+                        emitStrategy.getAllowLateness(),
+                        config);
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) {
+            
transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
+        }
+        // set KeyType and Selector for state
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
+        transform.setStateKeySelector(selector);
+        transform.setStateKeyType(selector.getProducedType());
+        return transform;
+    }
+
+    private Tuple2<WindowAssigner<?>, Trigger<?>> 
generateWindowAssignerAndTrigger() {
+        WindowAssigner<?> windowAssiger;
+        Trigger<?> trigger;
+        if (window instanceof TumblingGroupWindow) {
+            TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window;
+            FieldReferenceExpression timeField = tumblingWindow.timeField();
+            ValueLiteralExpression size = tumblingWindow.size();
+            if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) {
+                windowAssiger = 
TumblingWindowAssigner.of(toDuration(size)).withProcessingTime();
+                trigger = ProcessingTimeTriggers.afterEndOfWindow();
+            } else if (isRowtimeAttribute(timeField) && 
hasTimeIntervalType(size)) {
+                windowAssiger = 
TumblingWindowAssigner.of(toDuration(size)).withEventTime();
+                trigger = EventTimeTriggers.afterEndOfWindow();
+            } else if (isProctimeAttribute(timeField) && 
hasRowIntervalType(size)) {
+                windowAssiger = CountTumblingWindowAssigner.of(toLong(size));
+                trigger = ElementTriggers.count(toLong(size));
+            } else {
+                // TODO: EventTimeTumblingGroupWindow should sort the stream 
on event time
+                // before applying the  windowing logic. Otherwise, this would 
be the same as a
+                // ProcessingTimeTumblingGroupWindow
+                throw new UnsupportedOperationException(
+                        "Event-time grouping windows on row intervals are 
currently not supported.");
+            }
+        } else if (window instanceof SlidingGroupWindow) {
+            SlidingGroupWindow slidingWindow = (SlidingGroupWindow) window;
+            FieldReferenceExpression timeField = slidingWindow.timeField();
+            ValueLiteralExpression size = slidingWindow.size();
+            ValueLiteralExpression slide = slidingWindow.slide();
+            if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) {
+                windowAssiger = SlidingWindowAssigner.of(toDuration(size), 
toDuration(slide));
+                trigger = ProcessingTimeTriggers.afterEndOfWindow();
+            } else if (isRowtimeAttribute(timeField) && 
hasTimeIntervalType(size)) {
+                windowAssiger = SlidingWindowAssigner.of(toDuration(size), 
toDuration(slide));
+                trigger = EventTimeTriggers.afterEndOfWindow();
+            } else if (isProctimeAttribute(timeField) && 
hasRowIntervalType(size)) {
+                windowAssiger = CountSlidingWindowAssigner.of(toLong(size), 
toLong(slide));
+                trigger = ElementTriggers.count(toLong(size));
+            } else {
+                // TODO: EventTimeTumblingGroupWindow should sort the stream 
on event time
+                // before applying the  windowing logic. Otherwise, this would 
be the same as a
+                // ProcessingTimeTumblingGroupWindow
+                throw new UnsupportedOperationException(
+                        "Event-time grouping windows on row intervals are 
currently not supported.");
+            }
+        } else {
+            throw new TableException("Unsupported window: " + 
window.toString());
+        }
+        return Tuple2.of(windowAssiger, trigger);
+    }
+
+    private OneInputTransformation<RowData, RowData>
+            createPythonStreamWindowGroupOneInputTransformation(
+                    Transformation<RowData> inputTransform,
+                    RowType inputRowType,
+                    RowType outputRowType,
+                    int inputTimeFieldIndex,
+                    WindowAssigner<?> windowAssigner,
+                    Trigger<?> trigger,
+                    long allowance,
+                    Configuration config) {
+        int[] namePropertyTypeArray =
+                Arrays.stream(namedWindowProperties)
+                        .mapToInt(
+                                p -> {
+                                    PlannerWindowProperty property = 
p.property();
+                                    if (property instanceof 
PlannerWindowStart) {
+                                        return 0;
+                                    }
+                                    if (property instanceof PlannerWindowEnd) {
+                                        return 1;
+                                    }
+                                    if (property instanceof 
PlannerRowtimeAttribute) {
+                                        return 2;
+                                    }
+                                    if (property instanceof 
PlannerProctimeAttribute) {
+                                        return 3;
+                                    }
+                                    throw new TableException("Unexpected 
property " + property);
+                                })
+                        .toArray();
+
+        Tuple2<int[], PythonFunctionInfo[]> aggInfos =
+                extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls);
+        int[] pythonUdafInputOffsets = aggInfos.f0;
+        PythonFunctionInfo[] pythonFunctionInfos = aggInfos.f1;
+        OneInputStreamOperator<RowData, RowData> pythonOperator =
+                getPythonStreamGroupWindowAggregateFunctionOperator(
+                        config,
+                        inputRowType,
+                        outputRowType,
+                        windowAssigner,
+                        trigger,
+                        allowance,
+                        inputTimeFieldIndex,
+                        namePropertyTypeArray,
+                        pythonUdafInputOffsets,
+                        pythonFunctionInfos);
+        return new OneInputTransformation<>(
+                inputTransform,
+                getDesc(),
+                pythonOperator,
+                InternalTypeInfo.of(outputRowType),
+                inputTransform.getParallelism());
+    }
+
+    @SuppressWarnings("unchecked")
+    private OneInputStreamOperator<RowData, RowData>
+            getPythonStreamGroupWindowAggregateFunctionOperator(
+                    Configuration config,
+                    RowType inputRowType,
+                    RowType outputRowType,
+                    WindowAssigner<?> windowAssigner,
+                    Trigger<?> trigger,
+                    long allowance,
+                    int inputTimeFieldIndex,
+                    int[] namedProperties,
+                    int[] udafInputOffsets,
+                    PythonFunctionInfo[] pythonFunctionInfos) {
+        Class clazz =
+                CommonPythonUtil.loadClass(
+                        
ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+        try {
+            Constructor<OneInputStreamOperator<RowData, RowData>> ctor =
+                    clazz.getConstructor(
+                            Configuration.class,
+                            PythonFunctionInfo[].class,
+                            RowType.class,
+                            RowType.class,
+                            int.class,
+                            WindowAssigner.class,
+                            Trigger.class,
+                            long.class,
+                            int[].class,
+                            int[].class,
+                            int[].class);
+            return ctor.newInstance(
+                    config,
+                    pythonFunctionInfos,
+                    inputRowType,
+                    outputRowType,
+                    inputTimeFieldIndex,
+                    windowAssigner,
+                    trigger,
+                    allowance,
+                    namedProperties,
+                    grouping,
+                    udafInputOffsets);
+        } catch (NoSuchMethodException
+                | IllegalAccessException
+                | InstantiationException
+                | InvocationTargetException e) {
+            throw new TableException(
+                    "Python 
StreamArrowPythonGroupWindowAggregateFunctionOperator constructed failed.",
+                    e);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala
deleted file mode 100644
index 35f450c..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.batch
-
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.memory.ManagedMemoryUseCase
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.functions.python.PythonFunctionInfo
-import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
-import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator
-import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, 
PlannerWindowEnd, PlannerWindowStart}
-import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
-import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonPythonAggregate
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, 
ExecNodeBase}
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-import org.apache.flink.table.types.logical.RowType
-
-import org.apache.calcite.rel.core.AggregateCall
-
-import java.util.Collections
-
-/**
- * Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate 
function).
- *
- * <p>Note: This class can't be ported to Java,
- * because java class can't extend scala interface with default implementation.
- * FLINK-20858 will port this class to Java.
- */
-class BatchExecPythonGroupWindowAggregate(
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    aggCalls: Array[AggregateCall],
-    window: LogicalWindow,
-    inputTimeFieldIndex: Int,
-    namedWindowProperties: Array[PlannerNamedWindowProperty],
-    inputEdge: ExecEdge,
-    outputType: RowType,
-    description: String)
-  extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), 
outputType, description)
-  with BatchExecNode[RowData]
-  with CommonPythonAggregate {
-
-  override protected def translateToPlanInternal(planner: PlannerBase): 
Transformation[RowData] = {
-    val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]]
-    val inputTransform = inputNode.translateToPlan(planner)
-
-    val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
-
-    val groupBufferLimitSize = 
planner.getTableConfig.getConfiguration.getInteger(
-      ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
-
-    val config = getConfig(planner.getExecEnv, planner.getTableConfig)
-    val transform = createPythonOneInputTransformation(
-      inputTransform,
-      inputNode.getOutputType.asInstanceOf[RowType],
-      outputType,
-      inputTimeFieldIndex,
-      groupBufferLimitSize,
-      windowSizeAndSlideSize.f0,
-      windowSizeAndSlideSize.f1,
-      config)
-
-    if (isPythonWorkerUsingManagedMemory(config)) {
-      
transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
-    }
-    transform
-  }
-
-  private[this] def createPythonOneInputTransformation(
-      inputTransform: Transformation[RowData],
-      inputRowType: RowType,
-      outputRowType: RowType,
-      inputTimeFieldIndex: Int,
-      maxLimitSize: Int,
-      windowSize: Long,
-      slideSize: Long,
-      config: Configuration): OneInputTransformation[RowData, RowData] = {
-    val namePropertyTypeArray = namedWindowProperties.map {
-      case PlannerNamedWindowProperty(_, p) => p match {
-        case PlannerWindowStart(_) => 0
-        case PlannerWindowEnd(_) => 1
-        case PlannerRowtimeAttribute(_) => 2
-      }
-    }.toArray
-
-    val (pythonUdafInputOffsets, pythonFunctionInfos) =
-      extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls)
-
-    val pythonOperator = getPythonGroupWindowAggregateFunctionOperator(
-      config,
-      inputRowType,
-      outputRowType,
-      inputTimeFieldIndex,
-      maxLimitSize,
-      windowSize,
-      slideSize,
-      namePropertyTypeArray,
-      pythonUdafInputOffsets,
-      pythonFunctionInfos)
-
-    new OneInputTransformation(
-      inputTransform,
-      "BatchExecPythonGroupWindowAggregate",
-      pythonOperator,
-      InternalTypeInfo.of(outputRowType),
-      inputTransform.getParallelism)
-  }
-
-  private[this] def getPythonGroupWindowAggregateFunctionOperator(
-      config: Configuration,
-      inputRowType: RowType,
-      outputRowType: RowType,
-      inputTimeFieldIndex: Int,
-      maxLimitSize: Int,
-      windowSize: Long,
-      slideSize: Long,
-      namedWindowProperties: Array[Int],
-      udafInputOffsets: Array[Int],
-      pythonFunctionInfos: Array[PythonFunctionInfo]): 
OneInputStreamOperator[RowData, RowData] = {
-    val clazz = 
loadClass(ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME)
-
-    val ctor = clazz.getConstructor(
-      classOf[Configuration],
-      classOf[Array[PythonFunctionInfo]],
-      classOf[RowType],
-      classOf[RowType],
-      classOf[Int],
-      classOf[Int],
-      classOf[Long],
-      classOf[Long],
-      classOf[Array[Int]],
-      classOf[Array[Int]],
-      classOf[Array[Int]],
-      classOf[Array[Int]])
-
-    ctor.newInstance(
-      config,
-      pythonFunctionInfos,
-      inputRowType,
-      outputRowType,
-      Integer.valueOf(inputTimeFieldIndex),
-      Integer.valueOf(maxLimitSize),
-      java.lang.Long.valueOf(windowSize),
-      java.lang.Long.valueOf(slideSize),
-      namedWindowProperties,
-      grouping,
-      grouping ++ auxGrouping,
-      udafInputOffsets)
-      .asInstanceOf[OneInputStreamOperator[RowData, RowData]]
-  }
-}
-
-object BatchExecPythonGroupWindowAggregate {
-  val ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME: String =
-    "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." +
-      "BatchArrowPythonGroupWindowAggregateFunctionOperator"
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.scala
deleted file mode 100644
index 9f08aaa..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream
-
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.memory.ManagedMemoryUseCase
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.functions.python.PythonFunctionInfo
-import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, 
PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart}
-import org.apache.flink.table.planner.plan.logical.{LogicalWindow, 
SlidingGroupWindow, TumblingGroupWindow}
-import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonPythonAggregate
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, 
ExecNodeBase}
-import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.{hasRowIntervalType, 
hasTimeIntervalType, isProctimeAttribute, isRowtimeAttribute, timeFieldIndex, 
toDuration, toLong}
-import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, 
WindowEmitStrategy}
-import org.apache.flink.table.planner.utils.Logging
-import 
org.apache.flink.table.runtime.operators.window.assigners.{CountSlidingWindowAssigner,
 CountTumblingWindowAssigner, SlidingWindowAssigner, TumblingWindowAssigner, 
WindowAssigner}
-import 
org.apache.flink.table.runtime.operators.window.triggers.{ElementTriggers, 
EventTimeTriggers, ProcessingTimeTriggers, Trigger}
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-import org.apache.flink.table.types.logical.RowType
-
-import org.apache.calcite.rel.core.AggregateCall
-
-import java.util.Collections
-
-/**
- * Stream [[ExecNode]] for group widow aggregate (Python user defined 
aggregate function).
- *
- * <p>Note: This class can't be ported to Java,
- * because java class can't extend scala interface with default implementation.
- * FLINK-20858 will port this class to Java.
- */
-class StreamExecPythonGroupWindowAggregate(
-    grouping: Array[Int],
-    aggCalls: Array[AggregateCall],
-    window: LogicalWindow,
-    namedWindowProperties: Array[PlannerNamedWindowProperty],
-    emitStrategy: WindowEmitStrategy,
-    inputEdge: ExecEdge,
-    outputType: RowType,
-    description: String)
-  extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), 
outputType, description)
-  with StreamExecNode[RowData]
-  with CommonPythonAggregate
-  with Logging {
-
-  override def translateToPlanInternal(planner: PlannerBase): 
Transformation[RowData] = {
-    val isCountWindow = window match {
-      case TumblingGroupWindow(_, _, size) if hasRowIntervalType(size) => true
-      case SlidingGroupWindow(_, _, size, _) if hasRowIntervalType(size) => 
true
-      case _ => false
-    }
-
-    val config = planner.getTableConfig
-    if (isCountWindow && grouping.length > 0 && 
config.getMinIdleStateRetentionTime < 0) {
-      LOG.warn(
-        "No state retention interval configured for a query which accumulates 
state. " +
-          "Please provide a query configuration with valid retention interval 
to prevent " +
-          "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
-    }
-
-    val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]]
-    val inputRowType = inputNode.getOutputType.asInstanceOf[RowType]
-
-    val inputTimeFieldIndex = if (isRowtimeAttribute(window.timeAttribute)) {
-      val timeIndex = timeFieldIndex(
-        FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType),
-        planner.getRelBuilder,
-        window.timeAttribute)
-      if (timeIndex < 0) {
-        throw new TableException(
-          s"Group window PythonAggregate must defined on a time attribute, " +
-            "but the time attribute can't be found.\n" +
-            "This should never happen. Please file an issue.")
-      }
-      timeIndex
-    } else {
-      -1
-    }
-
-    val inputTransform = inputNode.translateToPlan(planner)
-    val (windowAssigner, trigger) = generateWindowAssignerAndTrigger()
-    val mergedConfig = getConfig(planner.getExecEnv, planner.getTableConfig)
-    val transform = createPythonStreamWindowGroupOneInputTransformation(
-      inputTransform,
-      inputNode.getOutputType.asInstanceOf[RowType],
-      outputType,
-      inputTimeFieldIndex,
-      windowAssigner,
-      trigger,
-      emitStrategy.getAllowLateness,
-      mergedConfig)
-
-    if (inputsContainSingleton()) {
-      transform.setParallelism(1)
-      transform.setMaxParallelism(1)
-    }
-
-    // set KeyType and Selector for state
-    val inputRowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
-    val selector = KeySelectorUtil.getRowDataSelector(grouping, 
inputRowTypeInfo)
-    transform.setStateKeySelector(selector)
-    transform.setStateKeyType(selector.getProducedType)
-
-    if (isPythonWorkerUsingManagedMemory(mergedConfig)) {
-      
transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
-    }
-    transform
-  }
-
-  private[this] def generateWindowAssignerAndTrigger(): (WindowAssigner[_], 
Trigger[_]) = {
-    window match {
-      case TumblingGroupWindow(_, timeField, size)
-        if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
-        (TumblingWindowAssigner.of(toDuration(size)).withProcessingTime(),
-          ProcessingTimeTriggers.afterEndOfWindow())
-
-      case TumblingGroupWindow(_, timeField, size)
-        if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
-        (TumblingWindowAssigner.of(toDuration(size)).withEventTime(),
-          EventTimeTriggers.afterEndOfWindow())
-
-      case TumblingGroupWindow(_, timeField, size)
-        if isProctimeAttribute(timeField) && hasRowIntervalType(size) =>
-        (CountTumblingWindowAssigner.of(toLong(size)),
-          ElementTriggers.count(toLong(size)))
-
-      case TumblingGroupWindow(_, _, _) =>
-        // TODO: EventTimeTumblingGroupWindow should sort the stream on event 
time
-        // before applying the  windowing logic. Otherwise, this would be the 
same as a
-        // ProcessingTimeTumblingGroupWindow
-        throw new UnsupportedOperationException(
-          "Event-time grouping windows on row intervals are currently not 
supported.")
-
-      case SlidingGroupWindow(_, timeField, size, slide)
-        if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
-        (SlidingWindowAssigner.of(toDuration(size), toDuration(slide)),
-          ProcessingTimeTriggers.afterEndOfWindow())
-
-      case SlidingGroupWindow(_, timeField, size, slide)
-        if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
-        (SlidingWindowAssigner.of(toDuration(size), toDuration(slide)),
-          EventTimeTriggers.afterEndOfWindow())
-
-      case SlidingGroupWindow(_, timeField, size, slide)
-        if isProctimeAttribute(timeField) && hasRowIntervalType(size) =>
-        (CountSlidingWindowAssigner.of(toLong(size), toLong(slide)),
-          ElementTriggers.count(toLong(size)))
-
-      case SlidingGroupWindow(_, _, _, _) =>
-        // TODO: EventTimeTumblingGroupWindow should sort the stream on event 
time
-        // before applying the  windowing logic. Otherwise, this would be the 
same as a
-        // ProcessingTimeTumblingGroupWindow
-        throw new UnsupportedOperationException(
-          "Event-time grouping windows on row intervals are currently not 
supported.")
-    }
-  }
-
-  private[this] def createPythonStreamWindowGroupOneInputTransformation(
-      inputTransform: Transformation[RowData],
-      inputRowType: RowType,
-      outputRowType: RowType,
-      inputTimeFieldIndex: Int,
-      windowAssigner: WindowAssigner[_],
-      trigger: Trigger[_],
-      allowance: Long,
-      config: Configuration): OneInputTransformation[RowData, RowData] = {
-
-    val namePropertyTypeArray = namedWindowProperties
-      .map {
-        case PlannerNamedWindowProperty(_, p) => p match {
-          case PlannerWindowStart(_) => 0
-          case PlannerWindowEnd(_) => 1
-          case PlannerRowtimeAttribute(_) => 2
-          case PlannerProctimeAttribute(_) => 3
-        }
-      }.toArray
-
-    val (pythonUdafInputOffsets, pythonFunctionInfos) =
-      extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls)
-    val pythonOperator = getPythonStreamGroupWindowAggregateFunctionOperator(
-      config,
-      inputRowType,
-      outputRowType,
-      windowAssigner,
-      trigger,
-      allowance,
-      inputTimeFieldIndex,
-      namePropertyTypeArray,
-      pythonUdafInputOffsets,
-      pythonFunctionInfos)
-
-    new OneInputTransformation(
-      inputTransform,
-      "StreamExecPythonGroupWindowAggregate",
-      pythonOperator,
-      InternalTypeInfo.of(outputRowType),
-      inputTransform.getParallelism)
-  }
-
-  private[this] def getPythonStreamGroupWindowAggregateFunctionOperator(
-      config: Configuration,
-      inputRowType: RowType,
-      outputRowType: RowType,
-      windowAssigner: WindowAssigner[_],
-      trigger: Trigger[_],
-      allowance: Long,
-      inputTimeFieldIndex: Int,
-      namedProperties: Array[Int],
-      udafInputOffsets: Array[Int],
-      pythonFunctionInfos: Array[PythonFunctionInfo]): 
OneInputStreamOperator[RowData, RowData] = {
-    val clazz = 
loadClass(ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME)
-
-    val ctor = clazz.getConstructor(
-      classOf[Configuration],
-      classOf[Array[PythonFunctionInfo]],
-      classOf[RowType],
-      classOf[RowType],
-      classOf[Int],
-      classOf[WindowAssigner[_]],
-      classOf[Trigger[_]],
-      classOf[Long],
-      classOf[Array[Int]],
-      classOf[Array[Int]],
-      classOf[Array[Int]])
-
-    ctor.newInstance(
-      config,
-      pythonFunctionInfos,
-      inputRowType,
-      outputRowType,
-      Integer.valueOf(inputTimeFieldIndex),
-      windowAssigner,
-      trigger,
-      java.lang.Long.valueOf(allowance),
-      namedProperties,
-      grouping,
-      udafInputOffsets)
-      .asInstanceOf[OneInputStreamOperator[RowData, RowData]]
-  }
-}
-
-object StreamExecPythonGroupWindowAggregate {
-  val ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME: 
String =
-    "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream." +
-      "StreamArrowPythonGroupWindowAggregateFunctionOperator"
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
index a6e8c21..2ae4e47 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
@@ -100,7 +100,7 @@ class BatchPhysicalPythonGroupWindowAggregate(
   override def translateToExecNode(): ExecNode[_] = {
     new BatchExecPythonGroupWindowAggregate(
       grouping,
-      auxGrouping,
+      grouping ++ auxGrouping,
       aggCalls.toArray,
       window,
       inputTimeFieldIndex,

Reply via email to