This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37633211cff21050e7bf0dafa9006ad1700ceb7a Author: Robert Metzger <rmetz...@apache.org> AuthorDate: Thu Jun 3 20:07:07 2021 +0200 Revert "[FLINK-22680][table-planner-blink] Fix IndexOutOfBoundsException when apply WatermarkAssignerChangelogNormalizeTransposeRule" This reverts commit a364daa37202dc4d7a60c613f547cdcd6893ecd2. --- ...arkAssignerChangelogNormalizeTransposeRule.java | 301 ++------------------- ...AssignerChangelogNormalizeTransposeRuleTest.xml | 191 ------------- ...signerChangelogNormalizeTransposeRuleTest.scala | 175 ------------ .../runtime/stream/sql/GroupWindowITCase.scala | 51 ++-- 4 files changed, 48 insertions(+), 670 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java index fb0cb49..aed80bd 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java @@ -18,40 +18,18 @@ package org.apache.flink.table.planner.plan.rules.physical.stream; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; -import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; -import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef; -import org.apache.flink.table.planner.typeutils.RowTypeUtils; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelRule; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLocalRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; -import org.apache.calcite.rex.RexProgramBuilder; -import org.apache.calcite.rex.RexShuttle; -import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.util.mapping.Mappings; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkArgument; @@ -84,288 +62,43 @@ public class WatermarkAssignerChangelogNormalizeTransposeRule public void onMatch(RelOptRuleCall call) { final StreamPhysicalWatermarkAssigner watermark = call.rel(0); final RelNode node = call.rel(1); - RelNode newTree; if (node instanceof StreamPhysicalCalc) { // with calc final StreamPhysicalCalc calc = call.rel(1); final StreamPhysicalChangelogNormalize changelogNormalize = call.rel(2); final StreamPhysicalExchange exchange = call.rel(3); - final Mappings.TargetMapping calcMapping = buildMapping(calc.getProgram()); - final RelDistribution exchangeDistribution = exchange.getDistribution(); - final RelDistribution newExchangeDistribution = exchangeDistribution.apply(calcMapping); - final boolean shuffleKeysAreKeptByCalc = - newExchangeDistribution.getType() == exchangeDistribution.getType() - && newExchangeDistribution.getKeys().size() - == exchangeDistribution.getKeys().size(); - if (shuffleKeysAreKeptByCalc) { - // Pushes down WatermarkAssigner/Calc as a whole if shuffle keys of - // Exchange are all kept by Calc - newTree = - pushDownOriginalWatermarkAndCalc( - watermark, - calc, - changelogNormalize, - exchange, - newExchangeDistribution); - } else { - // 1. Creates a new Calc which contains all shuffle keys - // 2. Pushes down new WatermarkAssigner/new Calc - // 3. Adds a top Calc to remove new added shuffle keys in step 1 - newTree = - pushDownTransformedWatermarkAndCalc( - watermark, - calc, - changelogNormalize, - exchange, - exchangeDistribution.getKeys(), - calcMapping); - } + + final RelNode newTree = + buildTreeInOrder( + changelogNormalize, exchange, watermark, calc, exchange.getInput()); + call.transformTo(newTree); } else if (node instanceof StreamPhysicalChangelogNormalize) { // without calc final StreamPhysicalChangelogNormalize changelogNormalize = call.rel(1); final StreamPhysicalExchange exchange = call.rel(2); - newTree = - buildTreeInOrder( - exchange.getInput(), - // Clears distribution on new WatermarkAssigner - Tuple2.of( - watermark, - watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), - Tuple2.of(exchange, exchange.getTraitSet()), - Tuple2.of(changelogNormalize, changelogNormalize.getTraitSet())); + + final RelNode newTree = + buildTreeInOrder(changelogNormalize, exchange, watermark, exchange.getInput()); + call.transformTo(newTree); } else { throw new IllegalStateException( this.getClass().getName() + " matches a wrong relation tree: " + RelOptUtil.toString(watermark)); } - call.transformTo(newTree); - } - - private RelNode pushDownOriginalWatermarkAndCalc( - StreamPhysicalWatermarkAssigner watermark, - StreamPhysicalCalc calc, - StreamPhysicalChangelogNormalize changelogNormalize, - StreamPhysicalExchange exchange, - RelDistribution newExchangeDistribution) { - return buildTreeInOrder( - exchange.getInput(), - // Clears distribution on new Calc/WatermarkAssigner - Tuple2.of(calc, calc.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), - Tuple2.of(watermark, watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), - // Updates distribution on new Exchange/Normalize based on field - // mapping of Calc - Tuple2.of(exchange, exchange.getTraitSet().plus(newExchangeDistribution)), - Tuple2.of( - changelogNormalize, - changelogNormalize.getTraitSet().plus(newExchangeDistribution))); - } - - private RelNode pushDownTransformedWatermarkAndCalc( - StreamPhysicalWatermarkAssigner watermark, - StreamPhysicalCalc calc, - StreamPhysicalChangelogNormalize changelogNormalize, - StreamPhysicalExchange exchange, - List<Integer> completeShuffleKeys, - Mappings.TargetMapping calcMapping) { - final List<Integer> projectedOutShuffleKeys = new ArrayList<>(); - for (Integer key : completeShuffleKeys) { - int targetIdx = calcMapping.getTargetOpt(key); - if (targetIdx < 0) { - projectedOutShuffleKeys.add(key); - } - } - // Creates a new Program which contains all shuffle keys - final RexBuilder rexBuilder = calc.getCluster().getRexBuilder(); - final RexProgram newPushDownProgram = - createTransformedProgramWithAllShuffleKeys( - calc.getProgram(), projectedOutShuffleKeys, rexBuilder); - if (newPushDownProgram.isPermutation()) { - // Pushes down transformed WatermarkAssigner alone if new pushDown program is a - // permutation of its inputs - return pushDownTransformedWatermark( - watermark, calc, changelogNormalize, exchange, calcMapping, rexBuilder); - } else { - // 1. Pushes down transformed WatermarkAssigner and transformed Calc - // 2. Adds a top Calc to remove new added shuffle keys - return pushDownTransformedWatermarkAndCalc( - newPushDownProgram, watermark, exchange, changelogNormalize, calc); - } - } - - private RexProgram createTransformedProgramWithAllShuffleKeys( - RexProgram program, List<Integer> projectsOutShuffleKeys, RexBuilder rexBuilder) { - RelDataType oldInputRowType = program.getInputRowType(); - List<String> visitedProjectNames = new ArrayList<>(); - RexProgramBuilder newProgramBuilder = new RexProgramBuilder(oldInputRowType, rexBuilder); - program.getNamedProjects() - .forEach( - pair -> { - newProgramBuilder.addProject( - program.expandLocalRef(pair.left), pair.right); - visitedProjectNames.add(pair.right); - }); - List<RelDataTypeField> oldFieldList = oldInputRowType.getFieldList(); - for (Integer projectsOutShuffleKey : projectsOutShuffleKeys) { - RelDataTypeField oldField = oldFieldList.get(projectsOutShuffleKey); - String oldFieldName = oldField.getName(); - String newProjectName = RowTypeUtils.getUniqueName(oldFieldName, visitedProjectNames); - newProgramBuilder.addProject( - new RexInputRef(projectsOutShuffleKey, oldField.getType()), newProjectName); - visitedProjectNames.add(newProjectName); - } - if (program.getCondition() != null) { - newProgramBuilder.addCondition(program.expandLocalRef(program.getCondition())); - } - return newProgramBuilder.getProgram(); - } - - private RelNode pushDownTransformedWatermarkAndCalc( - RexProgram newPushDownProgram, - StreamPhysicalWatermarkAssigner watermark, - StreamPhysicalExchange exchange, - StreamPhysicalChangelogNormalize changelogNormalize, - StreamPhysicalCalc calc) { - final RelNode pushDownCalc = - calc.copy( - // Clears distribution on new Calc - calc.getTraitSet().plus(FlinkRelDistribution.DEFAULT()), - exchange.getInput(), - newPushDownProgram); - final Mappings.TargetMapping mappingOfPushDownCalc = buildMapping(newPushDownProgram); - final RelDistribution newDistribution = - exchange.getDistribution().apply(mappingOfPushDownCalc); - final RelNode newChangelogNormalize = - buildTreeInOrder( - pushDownCalc, - Tuple2.of( - watermark, - watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), - // updates distribution on new Exchange/Normalize based on field - // mapping of Calc - Tuple2.of(exchange, exchange.getTraitSet().plus(newDistribution)), - Tuple2.of( - changelogNormalize, - changelogNormalize.getTraitSet().plus(newDistribution))); - final List<String> newInputFieldNames = newChangelogNormalize.getRowType().getFieldNames(); - final RexProgramBuilder topProgramBuilder = - new RexProgramBuilder( - newChangelogNormalize.getRowType(), - changelogNormalize.getCluster().getRexBuilder()); - for (int fieldIdx = 0; fieldIdx < calc.getRowType().getFieldCount(); fieldIdx++) { - topProgramBuilder.addProject( - RexInputRef.of(fieldIdx, newChangelogNormalize.getRowType()), - newInputFieldNames.get(fieldIdx)); - } - final RexProgram topProgram = topProgramBuilder.getProgram(); - return calc.copy(calc.getTraitSet(), newChangelogNormalize, topProgram); - } - - private RelNode pushDownTransformedWatermark( - StreamPhysicalWatermarkAssigner watermark, - StreamPhysicalCalc calc, - StreamPhysicalChangelogNormalize changelogNormalize, - StreamPhysicalExchange exchange, - Mappings.TargetMapping calcMapping, - RexBuilder rexBuilder) { - Mappings.TargetMapping inversedMapping = calcMapping.inverse(); - final int newRowTimeFieldIndex = - inversedMapping.getTargetOpt(watermark.rowtimeFieldIndex()); - // Updates watermark properties after push down before Calc - // 1. rewrites watermark expression - // 2. clears distribution - // 3. updates row time field index - RexNode newWatermarkExpr = watermark.watermarkExpr(); - if (watermark.watermarkExpr() != null) { - newWatermarkExpr = RexUtil.apply(inversedMapping, watermark.watermarkExpr()); - } - final RelNode newWatermark = - watermark.copy( - watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()), - exchange.getInput(), - newRowTimeFieldIndex, - newWatermarkExpr); - final RelNode newChangelogNormalize = - buildTreeInOrder( - newWatermark, - Tuple2.of(exchange, exchange.getTraitSet()), - Tuple2.of(changelogNormalize, changelogNormalize.getTraitSet())); - // Rewrites Calc program because the field type of row time - // field is changed after watermark pushed down - final RexProgram oldProgram = calc.getProgram(); - final RexProgramBuilder programBuilder = - new RexProgramBuilder(newChangelogNormalize.getRowType(), rexBuilder); - final Function<RexNode, RexNode> rexShuttle = - e -> - e.accept( - new RexShuttle() { - @Override - public RexNode visitInputRef(RexInputRef inputRef) { - if (inputRef.getIndex() == newRowTimeFieldIndex) { - return RexInputRef.of( - newRowTimeFieldIndex, - newChangelogNormalize.getRowType()); - } else { - return inputRef; - } - } - }); - oldProgram - .getNamedProjects() - .forEach( - pair -> - programBuilder.addProject( - rexShuttle.apply(oldProgram.expandLocalRef(pair.left)), - pair.right)); - if (oldProgram.getCondition() != null) { - programBuilder.addCondition( - rexShuttle.apply(oldProgram.expandLocalRef(oldProgram.getCondition()))); - } - final RexProgram newProgram = programBuilder.getProgram(); - return calc.copy(calc.getTraitSet(), newChangelogNormalize, newProgram); - } - - private Mappings.TargetMapping buildMapping(RexProgram program) { - final Map<Integer, Integer> mapInToOutPos = new HashMap<>(); - final List<RexLocalRef> projects = program.getProjectList(); - for (int idx = 0; idx < projects.size(); idx++) { - RexNode rexNode = program.expandLocalRef(projects.get(idx)); - if (rexNode instanceof RexInputRef) { - mapInToOutPos.put(((RexInputRef) rexNode).getIndex(), idx); - } - } - return Mappings.target( - mapInToOutPos, - program.getInputRowType().getFieldCount(), - program.getOutputRowType().getFieldCount()); } /** - * Build a new {@link RelNode} tree in the given nodes order which is in bottom-up direction. + * Build a new {@link RelNode} tree in the given nodes order which is in root-down direction. */ - @SafeVarargs - private RelNode buildTreeInOrder( - RelNode leafNode, Tuple2<RelNode, RelTraitSet>... nodeAndTraits) { - checkArgument(nodeAndTraits.length >= 1); - RelNode inputNode = leafNode; - RelNode currentNode = null; - for (Tuple2<RelNode, RelTraitSet> nodeAndTrait : nodeAndTraits) { - currentNode = nodeAndTrait.f0; - if (currentNode instanceof StreamPhysicalExchange) { - currentNode = - ((StreamPhysicalExchange) currentNode) - .copy( - nodeAndTrait.f1, - inputNode, - nodeAndTrait.f1.getTrait( - FlinkRelDistributionTraitDef.INSTANCE())); - } else { - currentNode = - currentNode.copy(nodeAndTrait.f1, Collections.singletonList(inputNode)); - } - inputNode = currentNode; + private RelNode buildTreeInOrder(RelNode... nodes) { + checkArgument(nodes.length >= 2); + RelNode root = nodes[nodes.length - 1]; + for (int i = nodes.length - 2; i >= 0; i--) { + RelNode node = nodes[i]; + root = node.copy(node.getTraitSet(), Collections.singletonList(root)); } - return currentNode; + return root; } /** Rule configuration. */ diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml deleted file mode 100644 index 1772ee9..0000000 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml +++ /dev/null @@ -1,191 +0,0 @@ -<?xml version="1.0" ?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to you under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -<Root> - <TestCase name="testGroupKeyIsComputedColumn"> - <Resource name="sql"> - <![CDATA[ -SELECT - currency2, - COUNT(1) AS cnt, - TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end -FROM src_with_computed_column2 -GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND) -]]> - </Resource> - <Resource name="ast"> - <![CDATA[ -LogicalProject(currency2=[$0], cnt=[$2], w_start=[TUMBLE_START($1)], w_end=[TUMBLE_END($1)]) -+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()]) - +- LogicalProject(currency2=[$1], $f1=[$TUMBLE($5, 5000:INTERVAL SECOND)]) - +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($5, 5000:INTERVAL SECOND)]) - +- LogicalProject(currency=[$0], currency2=[+($0, 2)], currency_no=[$1], rate=[$2], c=[$3], currency_time=[TO_TIMESTAMP($3)]) - +- LogicalTableScan(table=[[default_catalog, default_database, src_with_computed_column2]]) -]]> - </Resource> - <Resource name="optimized rel plan"> - <![CDATA[ -Calc(select=[currency2, cnt, w$start AS w_start, w$end AS w_end], changelogMode=[I]) -+- GroupWindowAggregate(groupBy=[currency2], window=[TumblingGroupWindow('w$, currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[currency2, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) - +- Exchange(distribution=[hash[currency2]], changelogMode=[I,UB,UA,D]) - +- Calc(select=[currency2, currency_time], changelogMode=[I,UB,UA,D]) - +- ChangelogNormalize(key=[currency2], changelogMode=[I,UB,UA,D]) - +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) - +- WatermarkAssigner(rowtime=[currency_time], watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D]) - +- Calc(select=[+(currency, 2) AS currency2, TO_TIMESTAMP(c) AS currency_time, currency], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, src_with_computed_column2, project=[currency, c]]], fields=[currency, c], changelogMode=[UA,D]) -]]> - </Resource> - </TestCase> - <TestCase name="testPushdownCalcAndWatermarkAssignerWithCalc"> - <Resource name="sql"> - <![CDATA[ -SELECT - currency, - COUNT(1) AS cnt, - TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end -FROM src_with_computed_column -GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) -]]> - </Resource> - <Resource name="ast"> - <![CDATA[ -LogicalProject(currency=[$0], cnt=[$2], w_start=[TUMBLE_START($1)], w_end=[TUMBLE_END($1)]) -+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()]) - +- LogicalProject(currency=[$0], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)]) - +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($4, 5000:INTERVAL SECOND)]) - +- LogicalProject(currency=[$0], currency_no=[$1], rate=[$2], c=[$3], currency_time=[TO_TIMESTAMP($3)]) - +- LogicalTableScan(table=[[default_catalog, default_database, src_with_computed_column]]) -]]> - </Resource> - <Resource name="optimized rel plan"> - <![CDATA[ -Calc(select=[currency, cnt, w$start AS w_start, w$end AS w_end], changelogMode=[I]) -+- GroupWindowAggregate(groupBy=[currency], window=[TumblingGroupWindow('w$, currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[currency, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) - +- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA,D]) - +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D]) - +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) - +- WatermarkAssigner(rowtime=[currency_time], watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D]) - +- Calc(select=[currency, TO_TIMESTAMP(c) AS currency_time], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, src_with_computed_column, project=[currency, c]]], fields=[currency, c], changelogMode=[UA,D]) -]]> - </Resource> - </TestCase> - <TestCase name="testPushdownNewCalcAndWatermarkAssignerWithCalc"> - <Resource name="sql"> - <![CDATA[ -SELECT - TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, - MAX(rate) AS max_rate -FROM src_with_computed_column -GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) -]]> - </Resource> - <Resource name="ast"> - <![CDATA[ -LogicalProject(w_start=[TUMBLE_START($0)], w_end=[TUMBLE_END($0)], max_rate=[$1]) -+- LogicalAggregate(group=[{0}], max_rate=[MAX($1)]) - +- LogicalProject($f0=[$TUMBLE($4, 5000:INTERVAL SECOND)], rate=[$2]) - +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($4, 5000:INTERVAL SECOND)]) - +- LogicalProject(currency=[$0], currency_no=[$1], rate=[$2], c=[$3], currency_time=[TO_TIMESTAMP($3)]) - +- LogicalTableScan(table=[[default_catalog, default_database, src_with_computed_column]]) -]]> - </Resource> - <Resource name="optimized rel plan"> - <![CDATA[ -Calc(select=[w$start AS w_start, w$end AS w_end, max_rate], changelogMode=[I]) -+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX(rate) AS max_rate, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) - +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D]) - +- Calc(select=[currency_time, rate], changelogMode=[I,UB,UA,D]) - +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D]) - +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) - +- WatermarkAssigner(rowtime=[currency_time], watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D]) - +- Calc(select=[TO_TIMESTAMP(c) AS currency_time, rate, currency], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, src_with_computed_column, project=[c, rate, currency]]], fields=[c, rate, currency], changelogMode=[UA,D]) -]]> - </Resource> - </TestCase> - <TestCase name="testPushdownWatermarkAssignerWithCalc"> - <Resource name="sql"> - <![CDATA[ -SELECT - TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, - MAX(rate) AS max_rate -FROM simple_src -GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) -]]> - </Resource> - <Resource name="ast"> - <![CDATA[ -LogicalProject(w_start=[TUMBLE_START($0)], w_end=[TUMBLE_END($0)], max_rate=[$1]) -+- LogicalAggregate(group=[{0}], max_rate=[MAX($1)]) - +- LogicalProject($f0=[$TUMBLE($3, 5000:INTERVAL SECOND)], rate=[$2]) - +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($3, 5000:INTERVAL SECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, simple_src]]) -]]> - </Resource> - <Resource name="optimized rel plan"> - <![CDATA[ -Calc(select=[w$start AS w_start, w$end AS w_end, max_rate], changelogMode=[I]) -+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX(rate) AS max_rate, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) - +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D]) - +- Calc(select=[currency_time, rate], changelogMode=[I,UB,UA,D]) - +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D]) - +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) - +- WatermarkAssigner(rowtime=[currency_time], watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, simple_src, project=[currency_time, rate, currency]]], fields=[currency_time, rate, currency], changelogMode=[UA,D]) -]]> - </Resource> - </TestCase> - <TestCase name="testPushdownWatermarkWithoutCalc"> - <Resource name="sql"> - <![CDATA[ -SELECT - currency, - COUNT(1) AS cnt, - TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end -FROM simple_src -GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) -]]> - </Resource> - <Resource name="ast"> - <![CDATA[ -LogicalProject(currency=[$0], cnt=[$2], w_start=[TUMBLE_START($1)], w_end=[TUMBLE_END($1)]) -+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()]) - +- LogicalProject(currency=[$0], $f1=[$TUMBLE($3, 5000:INTERVAL SECOND)]) - +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($3, 5000:INTERVAL SECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, simple_src]]) -]]> - </Resource> - <Resource name="optimized rel plan"> - <![CDATA[ -Calc(select=[currency, cnt, w$start AS w_start, w$end AS w_end], changelogMode=[I]) -+- GroupWindowAggregate(groupBy=[currency], window=[TumblingGroupWindow('w$, currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[currency, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) - +- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA,D]) - +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D]) - +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) - +- WatermarkAssigner(rowtime=[currency_time], watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, simple_src, project=[currency, currency_time]]], fields=[currency, currency_time], changelogMode=[UA,D]) -]]> - </Resource> - </TestCase> -</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala deleted file mode 100644 index be08e5b..0000000 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.rules.physical.stream - -import org.apache.flink.table.api.ExplainDetail -import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase} - -import org.junit.{Before, Test} - -/** - * Tests for [[WatermarkAssignerChangelogNormalizeTransposeRule]] - */ -class WatermarkAssignerChangelogNormalizeTransposeRuleTest extends TableTestBase { - private val util: StreamTableTestUtil = streamTestUtil() - - @Before - def setup(): Unit = { - util.addTable( - s""" - |CREATE TABLE simple_src ( - | currency STRING, - | currency_no STRING, - | rate BIGINT, - | currency_time TIMESTAMP(3), - | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, - | PRIMARY KEY(currency) NOT ENFORCED - |) WITH ( - | 'connector' = 'values', - | 'changelog-mode' = 'UA,D', - | 'enable-watermark-push-down' = 'true' - |) - |""".stripMargin) - - util.addTable( - s""" - |CREATE TABLE src_with_computed_column ( - | currency STRING, - | currency_no STRING, - | rate BIGINT, - | c STRING, - | currency_time as to_timestamp(c), - | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, - | PRIMARY KEY(currency) NOT ENFORCED - |) WITH ( - | 'connector' = 'values', - | 'changelog-mode' = 'UA,D', - | 'enable-watermark-push-down' = 'true' - |) - |""".stripMargin) - - util.addTable( - s""" - |CREATE TABLE src_with_computed_column2 ( - | currency int, - | currency2 as currency + 2, - | currency_no STRING, - | rate BIGINT, - | c STRING, - | currency_time as to_timestamp(c), - | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, - | PRIMARY KEY(currency) NOT ENFORCED - |) WITH ( - | 'connector' = 'values', - | 'changelog-mode' = 'UA,D', - | 'enable-watermark-push-down' = 'true' - |) - |""".stripMargin) - } - - // ---------------------------------------------------------------------------------------- - // Tests for queries matches WITHOUT_CALC patten - // Rewrite always happens in the case - // ---------------------------------------------------------------------------------------- - @Test - def testPushdownWatermarkWithoutCalc(): Unit = { - val sql = - """ - |SELECT - | currency, - | COUNT(1) AS cnt, - | TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - | TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end - |FROM simple_src - |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) - |""".stripMargin - util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) - } - - // ---------------------------------------------------------------------------------------- - // Tests for queries matches WITH_CALC patten - // ---------------------------------------------------------------------------------------- - - /** push down calc and watermark assigner as a whole if shuffle keys are kept after Calc. */ - @Test - def testPushdownCalcAndWatermarkAssignerWithCalc(): Unit = { - val sql = - """ - |SELECT - | currency, - | COUNT(1) AS cnt, - | TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - | TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end - |FROM src_with_computed_column - |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) - |""".stripMargin - util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) - } - - /** only push down watermark assigner if satisfy all the following condition: - * 1. shuffle keys are not kept after Calc - * 2. row time field does not depend on computed column - */ - @Test - def testPushdownWatermarkAssignerWithCalc(): Unit = { - val sql = - """ - |SELECT - | TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - | TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, - | MAX(rate) AS max_rate - |FROM simple_src - |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) - |""".stripMargin - util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) - } - - /** - * push down new calc with all shuffle keys and new watermarkAssigner, then add a top calc - * to remove add new added shuffle keys - */ - @Test - def testPushdownNewCalcAndWatermarkAssignerWithCalc(): Unit = { - val sql = - """ - |SELECT - | TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - | TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, - | MAX(rate) AS max_rate - |FROM src_with_computed_column - |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) - |""".stripMargin - util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) - } - - @Test - def testGroupKeyIsComputedColumn(): Unit = { - val sql = - """ - |SELECT - | currency2, - | COUNT(1) AS cnt, - | TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, - | TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end - |FROM src_with_computed_column2 - |GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND) - |""".stripMargin - util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) - } -} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala index 5a84fd0..659c639 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala @@ -34,7 +34,7 @@ import org.apache.flink.table.planner.runtime.utils._ import org.apache.flink.types.Row import org.junit.Assert.assertEquals -import org.junit.Test +import org.junit.{Ignore, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -51,15 +51,6 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) val SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai") - val upsertSourceCurrencyData = List( - changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)), - changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), localDateTime(2L)), - changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)), - changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)), - changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)), - changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), localDateTime(4L)), - changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L))) - override def before(): Unit = { super.before() @@ -382,6 +373,20 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) @Test def testWindowAggregateOnUpsertSource(): Unit = { + + def localDateTime(epochSecond: Long): LocalDateTime = { + LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC) + } + + val upsertSourceCurrencyData = List( + changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)), + changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), localDateTime(2L)), + changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)), + changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)), + changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)), + changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), localDateTime(4L)), + changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L))) + val upsertSourceDataId = registerData(upsertSourceCurrencyData) tEnv.executeSql( s""" @@ -420,8 +425,22 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) assertEquals(expected.sorted, sink.getAppendResults.sorted) } - @Test - def testWindowAggregateOnUpsertSourcePushdownWatermark(): Unit = { + @Ignore("FLINK-22680") + def testUnResolvedWindowAggregateOnUpsertSource(): Unit = { + + def localDateTime(epochSecond: Long): LocalDateTime = { + LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC) + } + + val upsertSourceCurrencyData = List( + changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)), + changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), localDateTime(2L)), + changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)), + changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)), + changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)), + changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), localDateTime(4L)), + changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L))) + val upsertSourceDataId = registerData(upsertSourceCurrencyData) tEnv.executeSql( s""" @@ -450,10 +469,6 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) val sink = new TestingAppendSink tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() - val expected = Seq( - "1970-01-01T00:00,1970-01-01T00:00:05,104", - "1970-01-01T00:00:05,1970-01-01T00:00:10,118") - assertEquals(expected.sorted, sink.getAppendResults.sorted) } @Test @@ -546,10 +561,6 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) tableConfig.getConfiguration.set( TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofMillis(intervalInMillis)) } - - private def localDateTime(epochSecond: Long): LocalDateTime = { - LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC) - } } object GroupWindowITCase {