bvarghese1 commented on code in PR #23681:
URL: https://github.com/apache/flink/pull/23681#discussion_r1418420396


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.time.Duration;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecGroupAggregate}. */
+public class GroupAggregateTestPrograms {
+
+    static final SourceTestStep SOURCE_ONE =
+            SourceTestStep.newBuilder("source_t")
+                    .addSchema("a INT", "b BIGINT", "c VARCHAR")
+                    .producedBeforeRestore(
+                            Row.of(1, 1L, "Hi"),
+                            Row.of(2, 2L, "Hello"),
+                            Row.of(2, 2L, "Hello World"))
+                    .producedAfterRestore(
+                            Row.of(1, 1L, "Hi Again!"),
+                            Row.of(2, 2L, "Hello Again!"),
+                            Row.of(2, 2L, "Hello World Again!"))
+                    .build();
+
+    static final SourceTestStep SOURCE_TWO =
+            SourceTestStep.newBuilder("source_t")
+                    .addSchema("a INT", "b BIGINT", "c INT", "d VARCHAR", "e 
BIGINT")
+                    .producedBeforeRestore(
+                            Row.of(2, 3L, 2, "Hello World Like", 1L),
+                            Row.of(3, 4L, 3, "Hello World Its nice", 2L),
+                            Row.of(2, 2L, 1, "Hello World", 2L),
+                            Row.of(1, 1L, 0, "Hello", 1L),
+                            Row.of(5, 11L, 10, "GHI", 1L),
+                            Row.of(3, 5L, 4, "ABC", 2L),
+                            Row.of(4, 10L, 9, "FGH", 2L),
+                            Row.of(4, 7L, 6, "CDE", 2L),
+                            Row.of(5, 14L, 13, "JKL", 2L),
+                            Row.of(4, 9L, 8, "EFG", 1L),
+                            Row.of(5, 15L, 14, "KLM", 2L),
+                            Row.of(5, 12L, 11, "HIJ", 3L),
+                            Row.of(4, 8L, 7, "DEF", 1L),
+                            Row.of(5, 13L, 12, "IJK", 3L),
+                            Row.of(3, 6L, 5, "BCD", 3L))
+                    .producedAfterRestore(
+                            Row.of(1, 1L, 0, "Hello", 1L),
+                            Row.of(3, 5L, 4, "ABC", 2L),
+                            Row.of(4, 10L, 9, "FGH", 2L),
+                            Row.of(4, 7L, 6, "CDE", 2L),
+                            Row.of(3, 6L, 5, "BCD", 3L))
+                    .build();
+
+    static final TableTestProgram GROUP_BY_SIMPLE =
+            TableTestProgram.of(
+                            "group-aggregate-simple", "validates basic 
aggregation using group by")
+                    .setupTableSource(SOURCE_ONE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "b BIGINT",
+                                            "cnt BIGINT",
+                                            "avg_a DOUBLE",
+                                            "min_c VARCHAR",
+                                            "PRIMARY KEY (b) NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, null, Hi]",
+                                            "+I[2, 1, 2.0, Hello]",
+                                            "+U[2, 2, 2.0, Hello]")
+                                    .consumedAfterRestore(
+                                            "+U[1, 2, null, Hi]",
+                                            "+U[2, 3, 2.0, Hello]",
+                                            "+U[2, 4, 2.0, Hello]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "b, "
+                                    + "COUNT(*) AS cnt, "
+                                    + "AVG(a) FILTER (WHERE a > 1) AS avg_a, "
+                                    + "MIN(c) AS min_c "
+                                    + "FROM source_t GROUP BY b")
+                    .build();
+
+    static final TableTestProgram GROUP_BY_SIMPLE_MINI_BATCH =
+            TableTestProgram.of(
+                            "group-aggregate-simple-mini-batch",
+                            "validates basic aggregation using group by with 
mini batch")
+                    .setupTableSource(SOURCE_ONE)
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(10))
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            GROUP_BY_SIMPLE
+                                                    .getSetupSinkTestSteps()
+                                                    .get(0)
+                                                    .schemaComponents)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, null, Hi]", "+I[2, 2, 
2.0, Hello]")
+                                    .consumedAfterRestore(
+                                            "+U[1, 2, null, Hi]", "+U[2, 4, 
2.0, Hello]")
+                                    .build())
+                    
.runSql(GroupAggregateTestPrograms.GROUP_BY_SIMPLE.getRunSqlTestStep().sql)
+                    .build();
+
+    static final TableTestProgram GROUP_BY_DISTINCT =
+            TableTestProgram.of("group-aggregate-distinct", "validates group 
by distinct")
+                    .setupTableSource(SOURCE_TWO)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "e BIGINT",
+                                            "cnt_a1 BIGINT",
+                                            "cnt_a2 BIGINT",
+                                            "sum_a BIGINT",
+                                            "sum_b BIGINT",
+                                            "avg_b DOUBLE",
+                                            "cnt_d BIGINT",
+                                            "PRIMARY KEY (e) NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 0, 1, 2, 3, 3.0, 1]",
+                                            "+I[2, 0, 1, 3, 4, 4.0, 1]",
+                                            "+U[2, 0, 2, 5, 6, 3.0, 2]",
+                                            "+U[1, 0, 2, 3, 4, 2.0, 2]",
+                                            "+U[1, 1, 3, 8, 15, 5.0, 3]",
+                                            "+U[2, 0, 2, 5, 11, 3.0, 3]",
+                                            "+U[2, 0, 3, 9, 21, 5.0, 4]",
+                                            "+U[2, 0, 3, 9, 28, 5.0, 5]",
+                                            "+U[2, 1, 4, 14, 42, 7.0, 6]",
+                                            "+U[1, 1, 4, 12, 24, 6.0, 4]",
+                                            "+U[2, 1, 4, 14, 57, 8.0, 7]",
+                                            "+I[3, 1, 1, 5, 12, 12.0, 1]",
+                                            "+U[1, 1, 4, 12, 32, 6.0, 5]",
+                                            "+U[3, 1, 1, 5, 25, 12.0, 2]",
+                                            "+U[3, 1, 2, 8, 31, 10.0, 3]")
+                                    .consumedAfterRestore(
+                                            "+U[1, 1, 4, 12, 32, 5.0, 5]",
+                                            "+U[2, 1, 4, 14, 57, 7.0, 7]",
+                                            "+U[2, 1, 4, 14, 57, 8.0, 7]",
+                                            "+U[2, 1, 4, 14, 57, 7.0, 7]",
+                                            "+U[3, 1, 2, 8, 31, 9.0, 3]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "e, "
+                                    + "COUNT(DISTINCT a) FILTER (WHERE b > 10) 
AS cnt_a1, "
+                                    + "COUNT(DISTINCT a) AS cnt_a2, "
+                                    + "SUM(DISTINCT a) AS sum_a, "
+                                    + "SUM(DISTINCT b) AS sum_b, "
+                                    + "AVG(b) AS avg_b, "
+                                    + "COUNT(DISTINCT d) AS concat_d "
+                                    + "FROM source_t GROUP BY e")
+                    .build();
+
+    static final TableTestProgram GROUP_BY_DISTINCT_MINI_BATCH =
+            TableTestProgram.of(
+                            "group-aggregate-distinct-mini-batch",
+                            "validates group by distinct with mini batch")
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(10))
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupTableSource(SOURCE_TWO)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            GROUP_BY_DISTINCT
+                                                    .getSetupSinkTestSteps()
+                                                    .get(0)
+                                                    .schemaComponents)
+                                    .consumedBeforeRestore(
+                                            "+I[3, 1, 2, 8, 31, 10.0, 3]",
+                                            "+I[2, 1, 4, 14, 42, 7.0, 6]",
+                                            "+I[1, 1, 4, 12, 24, 6.0, 4]",
+                                            "+U[2, 1, 4, 14, 57, 8.0, 7]",
+                                            "+U[1, 1, 4, 12, 32, 6.0, 5]")
+                                    .consumedAfterRestore(
+                                            "+U[3, 1, 2, 8, 31, 9.0, 3]",
+                                            "+U[2, 1, 4, 14, 57, 7.0, 7]",
+                                            "+U[1, 1, 4, 12, 32, 5.0, 5]")
+                                    .build())
+                    .runSql(GROUP_BY_DISTINCT.getRunSqlTestStep().sql)
+                    .build();
+
+    static final TableTestProgram GROUP_BY_UDF_WITH_MERGE =
+            TableTestProgram.of(
+                            "group-aggregate-udf-with-merge",
+                            "validates udfs with merging using group by")
+                    .setupCatalogFunction("my_avg", WeightedAvgWithMerge.class)
+                    .setupTemporarySystemFunction("my_concat", 
ConcatDistinctAggFunction.class)
+                    .setupTableSource(SOURCE_TWO)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "d BIGINT",
+                                            "s1 BIGINT",
+                                            "c1 VARCHAR",
+                                            "PRIMARY KEY (d) NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, Hello World Like]",
+                                            "+I[2, 2, Hello World Its nice]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World]",
+                                            "+U[1, 1, Hello World Like|Hello]",
+                                            "+U[1, 1, Hello World 
Like|Hello|GHI]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World|ABC]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World|ABC|FGH]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World|ABC|FGH|CDE]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World|ABC|FGH|CDE|JKL]",
+                                            "+U[1, 1, Hello World 
Like|Hello|GHI|EFG]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World|ABC|FGH|CDE|JKL|KLM]",
+                                            "+I[3, 3, HIJ]",
+                                            "+U[1, 1, Hello World 
Like|Hello|GHI|EFG|DEF]",
+                                            "+U[3, 3, HIJ|IJK]",
+                                            "+U[3, 3, HIJ|IJK|BCD]")
+                                    // no change is emitted after restore 
because of DistinctAgg
+                                    .ignoreConsumedAfterRestore()
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "e, "
+                                    + "my_avg(e, a) as s1, "
+                                    + "my_concat(d) as c1 "
+                                    + "FROM source_t GROUP BY e")
+                    .build();
+
+    static final TableTestProgram GROUP_BY_UDF_WITH_MERGE_MINI_BATCH =
+            TableTestProgram.of(
+                            "group-aggregate-udf-with-merge-mini-batch",
+                            "validates udfs with merging using group by with 
mini batch")
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(10))
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupCatalogFunction("my_avg", WeightedAvgWithMerge.class)
+                    .setupTemporarySystemFunction("my_concat", 
ConcatDistinctAggFunction.class)
+                    .setupTableSource(SOURCE_TWO)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            GROUP_BY_UDF_WITH_MERGE
+                                                    .getSetupSinkTestSteps()
+                                                    .get(0)
+                                                    .schemaComponents)
+                                    .consumedBeforeRestore(
+                                            "+I[2, 2, Hello World Its 
nice|Hello World|ABC|FGH|CDE|JKL]",
+                                            "+I[1, 1, Hello World 
Like|Hello|GHI|EFG]",
+                                            "+U[2, 2, Hello World Its 
nice|Hello World|ABC|FGH|CDE|JKL|KLM]",
+                                            "+U[1, 1, Hello World 
Like|Hello|GHI|EFG|DEF]",
+                                            "+I[3, 3, HIJ|IJK|BCD]")
+                                    // no change is emitted after restore 
because of DistinctAgg
+                                    .ignoreConsumedAfterRestore()

Review Comment:
   Modified the data for producedAfter and now all tests have consumedBefore 
and consumedAfter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to