This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37633211cff21050e7bf0dafa9006ad1700ceb7a
Author: Robert Metzger <rmetz...@apache.org>
AuthorDate: Thu Jun 3 20:07:07 2021 +0200

    Revert "[FLINK-22680][table-planner-blink] Fix IndexOutOfBoundsException 
when apply WatermarkAssignerChangelogNormalizeTransposeRule"
    
    This reverts commit a364daa37202dc4d7a60c613f547cdcd6893ecd2.
---
 ...arkAssignerChangelogNormalizeTransposeRule.java | 301 ++-------------------
 ...AssignerChangelogNormalizeTransposeRuleTest.xml | 191 -------------
 ...signerChangelogNormalizeTransposeRuleTest.scala | 175 ------------
 .../runtime/stream/sql/GroupWindowITCase.scala     |  51 ++--
 4 files changed, 48 insertions(+), 670 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
index fb0cb49..aed80bd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
@@ -18,40 +18,18 @@
 
 package org.apache.flink.table.planner.plan.rules.physical.stream;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
-import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
-import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
-import org.apache.flink.table.planner.typeutils.RowTypeUtils;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.util.mapping.Mappings;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -84,288 +62,43 @@ public class 
WatermarkAssignerChangelogNormalizeTransposeRule
     public void onMatch(RelOptRuleCall call) {
         final StreamPhysicalWatermarkAssigner watermark = call.rel(0);
         final RelNode node = call.rel(1);
-        RelNode newTree;
         if (node instanceof StreamPhysicalCalc) {
             // with calc
             final StreamPhysicalCalc calc = call.rel(1);
             final StreamPhysicalChangelogNormalize changelogNormalize = 
call.rel(2);
             final StreamPhysicalExchange exchange = call.rel(3);
-            final Mappings.TargetMapping calcMapping = 
buildMapping(calc.getProgram());
-            final RelDistribution exchangeDistribution = 
exchange.getDistribution();
-            final RelDistribution newExchangeDistribution = 
exchangeDistribution.apply(calcMapping);
-            final boolean shuffleKeysAreKeptByCalc =
-                    newExchangeDistribution.getType() == 
exchangeDistribution.getType()
-                            && newExchangeDistribution.getKeys().size()
-                                    == exchangeDistribution.getKeys().size();
-            if (shuffleKeysAreKeptByCalc) {
-                // Pushes down WatermarkAssigner/Calc as a whole if shuffle 
keys of
-                // Exchange are all kept by Calc
-                newTree =
-                        pushDownOriginalWatermarkAndCalc(
-                                watermark,
-                                calc,
-                                changelogNormalize,
-                                exchange,
-                                newExchangeDistribution);
-            } else {
-                // 1. Creates a new Calc which contains all shuffle keys
-                // 2. Pushes down new WatermarkAssigner/new Calc
-                // 3. Adds a top Calc to remove new added shuffle keys in step 
1
-                newTree =
-                        pushDownTransformedWatermarkAndCalc(
-                                watermark,
-                                calc,
-                                changelogNormalize,
-                                exchange,
-                                exchangeDistribution.getKeys(),
-                                calcMapping);
-            }
+
+            final RelNode newTree =
+                    buildTreeInOrder(
+                            changelogNormalize, exchange, watermark, calc, 
exchange.getInput());
+            call.transformTo(newTree);
         } else if (node instanceof StreamPhysicalChangelogNormalize) {
             // without calc
             final StreamPhysicalChangelogNormalize changelogNormalize = 
call.rel(1);
             final StreamPhysicalExchange exchange = call.rel(2);
-            newTree =
-                    buildTreeInOrder(
-                            exchange.getInput(),
-                            // Clears distribution on new WatermarkAssigner
-                            Tuple2.of(
-                                    watermark,
-                                    
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())),
-                            Tuple2.of(exchange, exchange.getTraitSet()),
-                            Tuple2.of(changelogNormalize, 
changelogNormalize.getTraitSet()));
+
+            final RelNode newTree =
+                    buildTreeInOrder(changelogNormalize, exchange, watermark, 
exchange.getInput());
+            call.transformTo(newTree);
         } else {
             throw new IllegalStateException(
                     this.getClass().getName()
                             + " matches a wrong relation tree: "
                             + RelOptUtil.toString(watermark));
         }
-        call.transformTo(newTree);
-    }
-
-    private RelNode pushDownOriginalWatermarkAndCalc(
-            StreamPhysicalWatermarkAssigner watermark,
-            StreamPhysicalCalc calc,
-            StreamPhysicalChangelogNormalize changelogNormalize,
-            StreamPhysicalExchange exchange,
-            RelDistribution newExchangeDistribution) {
-        return buildTreeInOrder(
-                exchange.getInput(),
-                // Clears distribution on new Calc/WatermarkAssigner
-                Tuple2.of(calc, 
calc.getTraitSet().plus(FlinkRelDistribution.DEFAULT())),
-                Tuple2.of(watermark, 
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())),
-                // Updates distribution on new Exchange/Normalize based on 
field
-                // mapping of Calc
-                Tuple2.of(exchange, 
exchange.getTraitSet().plus(newExchangeDistribution)),
-                Tuple2.of(
-                        changelogNormalize,
-                        
changelogNormalize.getTraitSet().plus(newExchangeDistribution)));
-    }
-
-    private RelNode pushDownTransformedWatermarkAndCalc(
-            StreamPhysicalWatermarkAssigner watermark,
-            StreamPhysicalCalc calc,
-            StreamPhysicalChangelogNormalize changelogNormalize,
-            StreamPhysicalExchange exchange,
-            List<Integer> completeShuffleKeys,
-            Mappings.TargetMapping calcMapping) {
-        final List<Integer> projectedOutShuffleKeys = new ArrayList<>();
-        for (Integer key : completeShuffleKeys) {
-            int targetIdx = calcMapping.getTargetOpt(key);
-            if (targetIdx < 0) {
-                projectedOutShuffleKeys.add(key);
-            }
-        }
-        // Creates a new Program which contains all shuffle keys
-        final RexBuilder rexBuilder = calc.getCluster().getRexBuilder();
-        final RexProgram newPushDownProgram =
-                createTransformedProgramWithAllShuffleKeys(
-                        calc.getProgram(), projectedOutShuffleKeys, 
rexBuilder);
-        if (newPushDownProgram.isPermutation()) {
-            // Pushes down transformed WatermarkAssigner alone if new pushDown 
program is a
-            // permutation of its inputs
-            return pushDownTransformedWatermark(
-                    watermark, calc, changelogNormalize, exchange, 
calcMapping, rexBuilder);
-        } else {
-            // 1. Pushes down transformed WatermarkAssigner and transformed 
Calc
-            // 2. Adds a top Calc to remove new added shuffle keys
-            return pushDownTransformedWatermarkAndCalc(
-                    newPushDownProgram, watermark, exchange, 
changelogNormalize, calc);
-        }
-    }
-
-    private RexProgram createTransformedProgramWithAllShuffleKeys(
-            RexProgram program, List<Integer> projectsOutShuffleKeys, 
RexBuilder rexBuilder) {
-        RelDataType oldInputRowType = program.getInputRowType();
-        List<String> visitedProjectNames = new ArrayList<>();
-        RexProgramBuilder newProgramBuilder = new 
RexProgramBuilder(oldInputRowType, rexBuilder);
-        program.getNamedProjects()
-                .forEach(
-                        pair -> {
-                            newProgramBuilder.addProject(
-                                    program.expandLocalRef(pair.left), 
pair.right);
-                            visitedProjectNames.add(pair.right);
-                        });
-        List<RelDataTypeField> oldFieldList = oldInputRowType.getFieldList();
-        for (Integer projectsOutShuffleKey : projectsOutShuffleKeys) {
-            RelDataTypeField oldField = 
oldFieldList.get(projectsOutShuffleKey);
-            String oldFieldName = oldField.getName();
-            String newProjectName = RowTypeUtils.getUniqueName(oldFieldName, 
visitedProjectNames);
-            newProgramBuilder.addProject(
-                    new RexInputRef(projectsOutShuffleKey, 
oldField.getType()), newProjectName);
-            visitedProjectNames.add(newProjectName);
-        }
-        if (program.getCondition() != null) {
-            
newProgramBuilder.addCondition(program.expandLocalRef(program.getCondition()));
-        }
-        return newProgramBuilder.getProgram();
-    }
-
-    private RelNode pushDownTransformedWatermarkAndCalc(
-            RexProgram newPushDownProgram,
-            StreamPhysicalWatermarkAssigner watermark,
-            StreamPhysicalExchange exchange,
-            StreamPhysicalChangelogNormalize changelogNormalize,
-            StreamPhysicalCalc calc) {
-        final RelNode pushDownCalc =
-                calc.copy(
-                        // Clears distribution on new Calc
-                        
calc.getTraitSet().plus(FlinkRelDistribution.DEFAULT()),
-                        exchange.getInput(),
-                        newPushDownProgram);
-        final Mappings.TargetMapping mappingOfPushDownCalc = 
buildMapping(newPushDownProgram);
-        final RelDistribution newDistribution =
-                exchange.getDistribution().apply(mappingOfPushDownCalc);
-        final RelNode newChangelogNormalize =
-                buildTreeInOrder(
-                        pushDownCalc,
-                        Tuple2.of(
-                                watermark,
-                                
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())),
-                        // updates distribution on new Exchange/Normalize 
based on field
-                        // mapping of Calc
-                        Tuple2.of(exchange, 
exchange.getTraitSet().plus(newDistribution)),
-                        Tuple2.of(
-                                changelogNormalize,
-                                
changelogNormalize.getTraitSet().plus(newDistribution)));
-        final List<String> newInputFieldNames = 
newChangelogNormalize.getRowType().getFieldNames();
-        final RexProgramBuilder topProgramBuilder =
-                new RexProgramBuilder(
-                        newChangelogNormalize.getRowType(),
-                        changelogNormalize.getCluster().getRexBuilder());
-        for (int fieldIdx = 0; fieldIdx < calc.getRowType().getFieldCount(); 
fieldIdx++) {
-            topProgramBuilder.addProject(
-                    RexInputRef.of(fieldIdx, 
newChangelogNormalize.getRowType()),
-                    newInputFieldNames.get(fieldIdx));
-        }
-        final RexProgram topProgram = topProgramBuilder.getProgram();
-        return calc.copy(calc.getTraitSet(), newChangelogNormalize, 
topProgram);
-    }
-
-    private RelNode pushDownTransformedWatermark(
-            StreamPhysicalWatermarkAssigner watermark,
-            StreamPhysicalCalc calc,
-            StreamPhysicalChangelogNormalize changelogNormalize,
-            StreamPhysicalExchange exchange,
-            Mappings.TargetMapping calcMapping,
-            RexBuilder rexBuilder) {
-        Mappings.TargetMapping inversedMapping = calcMapping.inverse();
-        final int newRowTimeFieldIndex =
-                inversedMapping.getTargetOpt(watermark.rowtimeFieldIndex());
-        // Updates watermark properties after push down before Calc
-        // 1. rewrites watermark expression
-        // 2. clears distribution
-        // 3. updates row time field index
-        RexNode newWatermarkExpr = watermark.watermarkExpr();
-        if (watermark.watermarkExpr() != null) {
-            newWatermarkExpr = RexUtil.apply(inversedMapping, 
watermark.watermarkExpr());
-        }
-        final RelNode newWatermark =
-                watermark.copy(
-                        
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()),
-                        exchange.getInput(),
-                        newRowTimeFieldIndex,
-                        newWatermarkExpr);
-        final RelNode newChangelogNormalize =
-                buildTreeInOrder(
-                        newWatermark,
-                        Tuple2.of(exchange, exchange.getTraitSet()),
-                        Tuple2.of(changelogNormalize, 
changelogNormalize.getTraitSet()));
-        // Rewrites Calc program because the field type of row time
-        // field is changed after watermark pushed down
-        final RexProgram oldProgram = calc.getProgram();
-        final RexProgramBuilder programBuilder =
-                new RexProgramBuilder(newChangelogNormalize.getRowType(), 
rexBuilder);
-        final Function<RexNode, RexNode> rexShuttle =
-                e ->
-                        e.accept(
-                                new RexShuttle() {
-                                    @Override
-                                    public RexNode visitInputRef(RexInputRef 
inputRef) {
-                                        if (inputRef.getIndex() == 
newRowTimeFieldIndex) {
-                                            return RexInputRef.of(
-                                                    newRowTimeFieldIndex,
-                                                    
newChangelogNormalize.getRowType());
-                                        } else {
-                                            return inputRef;
-                                        }
-                                    }
-                                });
-        oldProgram
-                .getNamedProjects()
-                .forEach(
-                        pair ->
-                                programBuilder.addProject(
-                                        
rexShuttle.apply(oldProgram.expandLocalRef(pair.left)),
-                                        pair.right));
-        if (oldProgram.getCondition() != null) {
-            programBuilder.addCondition(
-                    
rexShuttle.apply(oldProgram.expandLocalRef(oldProgram.getCondition())));
-        }
-        final RexProgram newProgram = programBuilder.getProgram();
-        return calc.copy(calc.getTraitSet(), newChangelogNormalize, 
newProgram);
-    }
-
-    private Mappings.TargetMapping buildMapping(RexProgram program) {
-        final Map<Integer, Integer> mapInToOutPos = new HashMap<>();
-        final List<RexLocalRef> projects = program.getProjectList();
-        for (int idx = 0; idx < projects.size(); idx++) {
-            RexNode rexNode = program.expandLocalRef(projects.get(idx));
-            if (rexNode instanceof RexInputRef) {
-                mapInToOutPos.put(((RexInputRef) rexNode).getIndex(), idx);
-            }
-        }
-        return Mappings.target(
-                mapInToOutPos,
-                program.getInputRowType().getFieldCount(),
-                program.getOutputRowType().getFieldCount());
     }
 
     /**
-     * Build a new {@link RelNode} tree in the given nodes order which is in 
bottom-up direction.
+     * Build a new {@link RelNode} tree in the given nodes order which is in 
root-down direction.
      */
-    @SafeVarargs
-    private RelNode buildTreeInOrder(
-            RelNode leafNode, Tuple2<RelNode, RelTraitSet>... nodeAndTraits) {
-        checkArgument(nodeAndTraits.length >= 1);
-        RelNode inputNode = leafNode;
-        RelNode currentNode = null;
-        for (Tuple2<RelNode, RelTraitSet> nodeAndTrait : nodeAndTraits) {
-            currentNode = nodeAndTrait.f0;
-            if (currentNode instanceof StreamPhysicalExchange) {
-                currentNode =
-                        ((StreamPhysicalExchange) currentNode)
-                                .copy(
-                                        nodeAndTrait.f1,
-                                        inputNode,
-                                        nodeAndTrait.f1.getTrait(
-                                                
FlinkRelDistributionTraitDef.INSTANCE()));
-            } else {
-                currentNode =
-                        currentNode.copy(nodeAndTrait.f1, 
Collections.singletonList(inputNode));
-            }
-            inputNode = currentNode;
+    private RelNode buildTreeInOrder(RelNode... nodes) {
+        checkArgument(nodes.length >= 2);
+        RelNode root = nodes[nodes.length - 1];
+        for (int i = nodes.length - 2; i >= 0; i--) {
+            RelNode node = nodes[i];
+            root = node.copy(node.getTraitSet(), 
Collections.singletonList(root));
         }
-        return currentNode;
+        return root;
     }
 
     /** Rule configuration. */
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
deleted file mode 100644
index 1772ee9..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++ /dev/null
@@ -1,191 +0,0 @@
-<?xml version="1.0" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<Root>
-  <TestCase name="testGroupKeyIsComputedColumn">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  currency2,
-  COUNT(1) AS cnt,
-  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
-FROM src_with_computed_column2
-GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND)
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(currency2=[$0], cnt=[$2], w_start=[TUMBLE_START($1)], 
w_end=[TUMBLE_END($1)])
-+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
-   +- LogicalProject(currency2=[$1], $f1=[$TUMBLE($5, 5000:INTERVAL SECOND)])
-      +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($5, 
5000:INTERVAL SECOND)])
-         +- LogicalProject(currency=[$0], currency2=[+($0, 2)], 
currency_no=[$1], rate=[$2], c=[$3], currency_time=[TO_TIMESTAMP($3)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
src_with_computed_column2]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[currency2, cnt, w$start AS w_start, w$end AS w_end], 
changelogMode=[I])
-+- GroupWindowAggregate(groupBy=[currency2], window=[TumblingGroupWindow('w$, 
currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[currency2, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
-   +- Exchange(distribution=[hash[currency2]], changelogMode=[I,UB,UA,D])
-      +- Calc(select=[currency2, currency_time], changelogMode=[I,UB,UA,D])
-         +- ChangelogNormalize(key=[currency2], changelogMode=[I,UB,UA,D])
-            +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-               +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
-                  +- Calc(select=[+(currency, 2) AS currency2, TO_TIMESTAMP(c) 
AS currency_time, currency], changelogMode=[UA,D])
-                     +- TableSourceScan(table=[[default_catalog, 
default_database, src_with_computed_column2, project=[currency, c]]], 
fields=[currency, c], changelogMode=[UA,D])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testPushdownCalcAndWatermarkAssignerWithCalc">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  currency,
-  COUNT(1) AS cnt,
-  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
-FROM src_with_computed_column
-GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(currency=[$0], cnt=[$2], w_start=[TUMBLE_START($1)], 
w_end=[TUMBLE_END($1)])
-+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
-   +- LogicalProject(currency=[$0], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)])
-      +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($4, 
5000:INTERVAL SECOND)])
-         +- LogicalProject(currency=[$0], currency_no=[$1], rate=[$2], c=[$3], 
currency_time=[TO_TIMESTAMP($3)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
src_with_computed_column]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[currency, cnt, w$start AS w_start, w$end AS w_end], 
changelogMode=[I])
-+- GroupWindowAggregate(groupBy=[currency], window=[TumblingGroupWindow('w$, 
currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[currency, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
-   +- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA,D])
-      +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D])
-         +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-            +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
-               +- Calc(select=[currency, TO_TIMESTAMP(c) AS currency_time], 
changelogMode=[UA,D])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, src_with_computed_column, project=[currency, c]]], 
fields=[currency, c], changelogMode=[UA,D])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testPushdownNewCalcAndWatermarkAssignerWithCalc">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,
-  MAX(rate) AS max_rate
-FROM src_with_computed_column
-GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(w_start=[TUMBLE_START($0)], w_end=[TUMBLE_END($0)], 
max_rate=[$1])
-+- LogicalAggregate(group=[{0}], max_rate=[MAX($1)])
-   +- LogicalProject($f0=[$TUMBLE($4, 5000:INTERVAL SECOND)], rate=[$2])
-      +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($4, 
5000:INTERVAL SECOND)])
-         +- LogicalProject(currency=[$0], currency_no=[$1], rate=[$2], c=[$3], 
currency_time=[TO_TIMESTAMP($3)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
src_with_computed_column]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[w$start AS w_start, w$end AS w_end, max_rate], changelogMode=[I])
-+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, currency_time, 
5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX(rate) 
AS max_rate, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
-   +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D])
-      +- Calc(select=[currency_time, rate], changelogMode=[I,UB,UA,D])
-         +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D])
-            +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-               +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
-                  +- Calc(select=[TO_TIMESTAMP(c) AS currency_time, rate, 
currency], changelogMode=[UA,D])
-                     +- TableSourceScan(table=[[default_catalog, 
default_database, src_with_computed_column, project=[c, rate, currency]]], 
fields=[c, rate, currency], changelogMode=[UA,D])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testPushdownWatermarkAssignerWithCalc">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,
-  MAX(rate) AS max_rate
-FROM simple_src
-GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(w_start=[TUMBLE_START($0)], w_end=[TUMBLE_END($0)], 
max_rate=[$1])
-+- LogicalAggregate(group=[{0}], max_rate=[MAX($1)])
-   +- LogicalProject($f0=[$TUMBLE($3, 5000:INTERVAL SECOND)], rate=[$2])
-      +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($3, 
5000:INTERVAL SECOND)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
simple_src]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[w$start AS w_start, w$end AS w_end, max_rate], changelogMode=[I])
-+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, currency_time, 
5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX(rate) 
AS max_rate, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
-   +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D])
-      +- Calc(select=[currency_time, rate], changelogMode=[I,UB,UA,D])
-         +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D])
-            +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-               +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, simple_src, project=[currency_time, rate, currency]]], 
fields=[currency_time, rate, currency], changelogMode=[UA,D])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testPushdownWatermarkWithoutCalc">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  currency,
-  COUNT(1) AS cnt,
-  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
-FROM simple_src
-GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(currency=[$0], cnt=[$2], w_start=[TUMBLE_START($1)], 
w_end=[TUMBLE_END($1)])
-+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
-   +- LogicalProject(currency=[$0], $f1=[$TUMBLE($3, 5000:INTERVAL SECOND)])
-      +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($3, 
5000:INTERVAL SECOND)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
simple_src]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Calc(select=[currency, cnt, w$start AS w_start, w$end AS w_end], 
changelogMode=[I])
-+- GroupWindowAggregate(groupBy=[currency], window=[TumblingGroupWindow('w$, 
currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[currency, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
-   +- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA,D])
-      +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D])
-         +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-            +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
simple_src, project=[currency, currency_time]]], fields=[currency, 
currency_time], changelogMode=[UA,D])
-]]>
-    </Resource>
-  </TestCase>
-</Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
deleted file mode 100644
index be08e5b..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.ExplainDetail
-import org.apache.flink.table.planner.utils.{StreamTableTestUtil, 
TableTestBase}
-
-import org.junit.{Before, Test}
-
-/**
- * Tests for [[WatermarkAssignerChangelogNormalizeTransposeRule]]
- */
-class WatermarkAssignerChangelogNormalizeTransposeRuleTest extends 
TableTestBase {
-  private val util: StreamTableTestUtil = streamTestUtil()
-
-  @Before
-  def setup(): Unit = {
-    util.addTable(
-      s"""
-         |CREATE TABLE simple_src (
-         |  currency STRING,
-         |  currency_no STRING,
-         |  rate  BIGINT,
-         |  currency_time TIMESTAMP(3),
-         |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,
-         |  PRIMARY KEY(currency) NOT ENFORCED
-         |) WITH (
-         |  'connector' = 'values',
-         |  'changelog-mode' = 'UA,D',
-         |  'enable-watermark-push-down' = 'true'
-         |)
-         |""".stripMargin)
-
-    util.addTable(
-      s"""
-         |CREATE TABLE src_with_computed_column (
-         |  currency STRING,
-         |  currency_no STRING,
-         |  rate  BIGINT,
-         |  c STRING,
-         |  currency_time as to_timestamp(c),
-         |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,
-         |  PRIMARY KEY(currency) NOT ENFORCED
-         |) WITH (
-         |  'connector' = 'values',
-         |  'changelog-mode' = 'UA,D',
-         |  'enable-watermark-push-down' = 'true'
-         |)
-         |""".stripMargin)
-
-    util.addTable(
-      s"""
-         |CREATE TABLE src_with_computed_column2 (
-         | currency int,
-         | currency2 as currency + 2,
-         | currency_no STRING,
-         | rate BIGINT,
-         | c STRING,
-         | currency_time as to_timestamp(c),
-         | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,
-         | PRIMARY KEY(currency) NOT ENFORCED
-         |) WITH (
-         | 'connector' = 'values',
-         | 'changelog-mode' = 'UA,D',
-         | 'enable-watermark-push-down' = 'true'
-         |)
-         |""".stripMargin)
-  }
-
-  // 
----------------------------------------------------------------------------------------
-  // Tests for queries matches WITHOUT_CALC patten
-  // Rewrite always happens in the case
-  // 
----------------------------------------------------------------------------------------
-  @Test
-  def testPushdownWatermarkWithoutCalc(): Unit = {
-    val sql =
-      """
-        |SELECT
-        |  currency,
-        |  COUNT(1) AS cnt,
-        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
-        |FROM simple_src
-        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)
-        |""".stripMargin
-    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
-  }
-
-  // 
----------------------------------------------------------------------------------------
-  // Tests for queries matches WITH_CALC patten
-  // 
----------------------------------------------------------------------------------------
-
-  /** push down calc and watermark assigner as a whole if shuffle keys are 
kept after Calc. */
-  @Test
-  def testPushdownCalcAndWatermarkAssignerWithCalc(): Unit = {
-    val sql =
-      """
-        |SELECT
-        |  currency,
-        |  COUNT(1) AS cnt,
-        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
-        |FROM src_with_computed_column
-        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)
-        |""".stripMargin
-    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
-  }
-
-  /** only push down watermark assigner if satisfy all the following condition:
-   *  1. shuffle keys are not kept after Calc
-   *  2. row time field does not depend on computed column
-   */
-  @Test
-  def testPushdownWatermarkAssignerWithCalc(): Unit = {
-    val sql =
-      """
-        |SELECT
-        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,
-        |  MAX(rate) AS max_rate
-        |FROM simple_src
-        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)
-        |""".stripMargin
-    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
-  }
-
-  /**
-   * push down new calc with all shuffle keys and new watermarkAssigner, then 
add a top calc
-   * to remove add new added shuffle keys
-    */
-  @Test
-  def testPushdownNewCalcAndWatermarkAssignerWithCalc(): Unit = {
-    val sql =
-      """
-        |SELECT
-        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,
-        |  MAX(rate) AS max_rate
-        |FROM src_with_computed_column
-        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)
-        |""".stripMargin
-    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
-  }
-
-  @Test
-  def testGroupKeyIsComputedColumn(): Unit = {
-    val sql =
-      """
-        |SELECT
-        |  currency2,
-        |  COUNT(1) AS cnt,
-        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
-        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
-        |FROM src_with_computed_column2
-        |GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND)
-        |""".stripMargin
-    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
-  }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 5a84fd0..659c639 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.types.Row
 
 import org.junit.Assert.assertEquals
-import org.junit.Test
+import org.junit.{Ignore, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -51,15 +51,6 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
 
   val SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai")
 
-  val upsertSourceCurrencyData = List(
-    changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
-    changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), 
localDateTime(2L)),
-    changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)),
-    changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)),
-    changelogRow("+U", "Euro",  "no1", JLong.valueOf(118L), localDateTime(6L)),
-    changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), 
localDateTime(4L)),
-    changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)))
-
   override def before(): Unit = {
     super.before()
 
@@ -382,6 +373,20 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
 
   @Test
   def testWindowAggregateOnUpsertSource(): Unit = {
+
+    def localDateTime(epochSecond: Long): LocalDateTime = {
+      LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+    }
+
+    val upsertSourceCurrencyData = List(
+      changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), 
localDateTime(1L)),
+      changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), 
localDateTime(2L)),
+      changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)),
+      changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)),
+      changelogRow("+U", "Euro",  "no1", JLong.valueOf(118L), 
localDateTime(6L)),
+      changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), 
localDateTime(4L)),
+      changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)))
+
     val upsertSourceDataId = registerData(upsertSourceCurrencyData)
     tEnv.executeSql(
       s"""
@@ -420,8 +425,22 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
-  @Test
-  def testWindowAggregateOnUpsertSourcePushdownWatermark(): Unit = {
+  @Ignore("FLINK-22680")
+  def testUnResolvedWindowAggregateOnUpsertSource(): Unit = {
+
+    def localDateTime(epochSecond: Long): LocalDateTime = {
+      LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+    }
+
+    val upsertSourceCurrencyData = List(
+      changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), 
localDateTime(1L)),
+      changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), 
localDateTime(2L)),
+      changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)),
+      changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)),
+      changelogRow("+U", "Euro",  "no1", JLong.valueOf(118L), 
localDateTime(6L)),
+      changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), 
localDateTime(4L)),
+      changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)))
+
     val upsertSourceDataId = registerData(upsertSourceCurrencyData)
     tEnv.executeSql(
       s"""
@@ -450,10 +469,6 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
     val sink = new TestingAppendSink
     tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
     env.execute()
-    val expected = Seq(
-      "1970-01-01T00:00,1970-01-01T00:00:05,104",
-      "1970-01-01T00:00:05,1970-01-01T00:00:10,118")
-    assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
   @Test
@@ -546,10 +561,6 @@ class GroupWindowITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
     tableConfig.getConfiguration.set(
       TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofMillis(intervalInMillis))
   }
-
-  private def localDateTime(epochSecond: Long): LocalDateTime = {
-    LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
-  }
 }
 
 object GroupWindowITCase {

Reply via email to