WeiZhong94 commented on a change in pull request #14775:
URL: https://github.com/apache/flink/pull/14775#discussion_r568352971



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
##########
@@ -76,8 +61,7 @@
  */

Review comment:
       The java doc needs to be changed.

##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -147,6 +147,30 @@ message UserDefinedAggregateFunction {
   bool takes_row_as_input = 6;
 }
 
+message GroupWindow {
+  enum WindowType {
+    TUMBLING_GROUP_WINDOW = 0;
+    SLIDING_GROUP_WINDOW = 1;
+    SESSION_GROUP_WINDOW = 2;
+  }
+
+  WindowType window_type = 1;
+
+  bool is_time_window = 2;
+
+  int64 window_slide = 3;
+
+  int64 window_size = 4;
+

Review comment:
       Currently the gap of the session window stores in the `window_size` 
field. I think we can add a new `window_gap` field to make it easier to 
understand.

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import 
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import 
org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle 
size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when 
finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), 
initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), 
initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), 
initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), 
initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), 
initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(

Review comment:
       It would be better if we extract some utility method to create these 
stream record in one line.

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import 
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import 
org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle 
size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when 
finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), 
initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), 
initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), 
initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c3", "c8", 3L, 0L), 
initialTime + 5));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(false, "c3", "c8", 3L, 0L), 
initialTime + 6));
+        testHarness.processWatermark(Long.MAX_VALUE);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c3 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), 
initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), 
initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), 
initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
+        testHarness.processWatermark(new Watermark(10000L));
+        // checkpoint trigger finishBundle
+        testHarness.prepareSnapshotPreBarrier(0L);
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByCount() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 4);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), 
initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), 
initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), 
initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
+
+        testHarness.processWatermark(new Watermark(10000L));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(new Watermark(10000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(20000L);
+        testHarness.close();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void testFinishBundleTriggeredByTime() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
+        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
getTestHarness(conf);
+
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.open();
+
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c2", 0L, 0L), 
initialTime + 1));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c4", 1L, 6000L), 
initialTime + 2));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10000L), 
initialTime + 3));
+        testHarness.processElement(
+                new StreamRecord<>(newRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
+        testHarness.processWatermark(new Watermark(20000L));
+        assertOutputEquals(
+                "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(1000L);
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=-5000, end=5000}",
+                                0L,
+                                TimestampData.fromEpochMillis(-5000L),
+                                TimestampData.fromEpochMillis(5000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c2",
+                                3L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c2 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=0, end=10000}",
+                                0L,
+                                TimestampData.fromEpochMillis(0L),
+                                TimestampData.fromEpochMillis(10000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                1L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=5000, end=15000}",
+                                0L,
+                                TimestampData.fromEpochMillis(5000L),
+                                TimestampData.fromEpochMillis(15000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "c1",
+                                2L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        newRow(
+                                true,
+                                "state_cleanup_triggered: c1 : 
TimeWindow{start=10000, end=20000}",
+                                0L,
+                                TimestampData.fromEpochMillis(10000L),
+                                TimestampData.fromEpochMillis(20000L))));
+
+        expectedOutput.add(new Watermark(20000L));
+
+        assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Override
+    public LogicalType[] getOutputLogicalType() {
+        return new LogicalType[] {
+            DataTypes.STRING().getLogicalType(), 
DataTypes.BIGINT().getLogicalType()
+        };
+    }
+
+    @Override
+    public RowType getInputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new VarCharType()),
+                        new RowType.RowField("f3", new BigIntType()),
+                        new RowType.RowField("rowTime", new BigIntType())));
+    }
+
+    @Override
+    public RowType getOutputType() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("f1", new VarCharType()),
+                        new RowType.RowField("f2", new BigIntType()),
+                        new RowType.RowField("windowStart", new 
TimestampType(3)),
+                        new RowType.RowField("windowEnd", new 
TimestampType(3))));
+    }
+
+    @Override
+    OneInputStreamOperator getTestOperator(Configuration config) {
+        long size = 10000L;
+        long slide = 5000L;
+        SlidingWindowAssigner windowAssigner =
+                SlidingWindowAssigner.of(Duration.ofMillis(size), 
Duration.ofMillis(slide))
+                        .withEventTime();
+        PlannerWindowReference windowRef =
+                new PlannerWindowReference("w$", new Some<>(new 
TimestampType(3)));
+        LogicalWindow window =
+                new SlidingGroupWindow(
+                        windowRef,
+                        new FieldReferenceExpression(
+                                "rowtime",
+                                new AtomicDataType(
+                                        new TimestampType(true, 
TimestampKind.ROWTIME, 3)),
+                                0,
+                                3),
+                        intervalOfMillis(size),
+                        intervalOfMillis(slide));
+        return new PassThroughPythonStreamGroupWindowAggregateOperator(
+                config,
+                getInputType(),
+                getOutputType(),
+                new PythonAggregateFunctionInfo[] {
+                    new PythonAggregateFunctionInfo(
+                            
PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE,
+                            new Integer[] {2},
+                            -1,
+                            false)
+                },
+                getGrouping(),
+                -1,
+                false,
+                false,
+                3,
+                windowAssigner,
+                window,
+                0,
+                new int[] {0, 1});
+    }
+
+    /** PassThroughPythonStreamGroupWindowAggregateOperator. */
+    public static class PassThroughPythonStreamGroupWindowAggregateOperator<K>

Review comment:
       It would be better if we extract this class to a independent java file. 
It is so large.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. 
*/
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements 
Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to 
lateness.
+     *   <li>Clearing the state of a window if the system time passes the 
{@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the 
Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} 
to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. 
*/
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;
+
+    public PythonStreamGroupWindowAggregateOperator(
+            Configuration config,
+            RowType inputType,
+            RowType outputType,
+            PythonAggregateFunctionInfo[] aggregateFunctions,
+            DataViewUtils.DataViewSpec[][] dataViewSpecs,
+            int[] grouping,
+            int indexOfCountStar,
+            boolean generateUpdateBefore,
+            boolean countStarInserted,
+            int inputTimeFieldIndex,
+            WindowAssigner<W> windowAssigner,
+            LogicalWindow window,
+            long allowedLateness,
+            int[] namedProperties) {
+        super(
+                config,
+                inputType,
+                outputType,
+                aggregateFunctions,
+                dataViewSpecs,
+                grouping,
+                indexOfCountStar,
+                generateUpdateBefore);
+        this.countStarInserted = countStarInserted;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.windowAssigner = windowAssigner;
+        this.allowedLateness = allowedLateness;
+        this.namedProperties = namedProperties;
+        buildWindow(window);
+    }
+
+    @Override
+    public void open() throws Exception {
+        windowSerializer = windowAssigner.getWindowSerializer(new 
ExecutionConfig());
+        internalTimerService = getInternalTimerService("window-timers", 
windowSerializer, this);
+        if (isTimeWindow) {

Review comment:
       How about add some explanation about the structure of the timer data?

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
##########
@@ -0,0 +1,1129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.PythonOptions;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.expressions.PlannerWindowReference;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import 
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import 
org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
+import org.apache.flink.table.runtime.utils.PythonTestUtils;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+
+/**
+ * Test for {@link PythonStreamGroupWindowAggregateOperator}. These test that:
+ *
+ * <ul>
+ *   <li>Retraction flag is handled correctly
+ *   <li>FinishBundle is called when checkpoint is encountered
+ *   <li>FinishBundle is called when bundled element count reach to max bundle 
size
+ *   <li>FinishBundle is called when bundled time reach to max bundle time
+ *   <li>Watermarks are buffered and only sent to downstream when 
finishedBundle is triggered
+ * </ul>
+ */
+public class PythonStreamGroupWindowAggregateOperatorTest
+        extends AbstractPythonStreamAggregateOperatorTest {
+    @Test
+    public void testGroupWindowAggregateFunction() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                getTestHarness(new Configuration());
+        long initialTime = 0L;
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+        testHarness.open();
+        testHarness.processElement(

Review comment:
       It would be better if we extract some utility method to create these 
stream record in one line.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. 
*/
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements 
Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to 
lateness.
+     *   <li>Clearing the state of a window if the system time passes the 
{@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;

Review comment:
       If we do not plan to support early fire/late fire in the first version 
of the Python group window aggregate, the `allowedLateness` field is 
unnecessary/

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. 
*/
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements 
Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to 
lateness.
+     *   <li>Clearing the state of a window if the system time passes the 
{@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the 
Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} 
to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. 
*/
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;
+
+    private int timerDataLength;
+
+    private int keyLength;

Review comment:
       ditto

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. 
*/
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements 
Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;
+
+    /** True if the count(*) agg is inserted by the planner. */
+    private final boolean countStarInserted;
+
+    /** The row time index of the input data. */
+    @VisibleForTesting final int inputTimeFieldIndex;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to 
lateness.
+     *   <li>Clearing the state of a window if the system time passes the 
{@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    @VisibleForTesting final long allowedLateness;
+
+    /**
+     * The Infos of the Window. 0 -> start of the Window. 1 -> end of the 
Window. 2 -> row time of
+     * the Window. 3 -> proc time of the Window.
+     */
+    private final int[] namedProperties;
+
+    /** A {@link WindowAssigner} assigns zero or more {@link Window Windows} 
to an element. */
+    @VisibleForTesting final WindowAssigner<W> windowAssigner;
+
+    /** Window Type includes Tumble window, Sliding window and Session Window. 
*/
+    private transient FlinkFnApi.GroupWindow.WindowType windowType;
+
+    /** Whether it is a row time window. */
+    private transient boolean isRowTime;
+
+    /** Whether it is a Time Window. */
+    private transient boolean isTimeWindow;
+
+    /** Window size. */
+    private transient long size;
+
+    /** Window slide. */
+    private transient long slide;
+
+    /** For serializing the window in checkpoints. */
+    @VisibleForTesting transient TypeSerializer<W> windowSerializer;
+
+    /** Interface for working with time and timers. */
+    private transient InternalTimerService<W> internalTimerService;
+
+    private UpdatableRowData reuseTimerData;

Review comment:
       This field can also be transient.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.typeutils.DataViewUtils;
+import org.apache.flink.table.runtime.operators.window.CountWindow;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.table.runtime.operators.window.Window;
+import 
org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** The Python Group Window AggregateFunction operator for the blink planner. 
*/
+@Internal
+public class PythonStreamGroupWindowAggregateOperator<K, W extends Window>
+        extends AbstractPythonStreamAggregateOperator implements 
Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    @VisibleForTesting
+    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN =
+            "flink:transform:stream_group_window_aggregate:v1";
+
+    @VisibleForTesting static final byte REGISTER_EVENT_TIMER = 0;
+
+    @VisibleForTesting static final byte REGISTER_PROCESSING_TIMER = 1;
+
+    @VisibleForTesting static final byte DELETE_EVENT_TIMER = 2;
+
+    @VisibleForTesting static final byte DELETE_PROCESSING_TIMER = 4;

Review comment:
       Why not 3?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to