This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 50d46e11bd3 [FLINK-37475] Drop ChangelogNormalize for piping from
upsert source to sink
50d46e11bd3 is described below
commit 50d46e11bd37ea3f2c19dd8f9a77c2b2dad4b7aa
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Feb 28 15:08:06 2025 +0100
[FLINK-37475] Drop ChangelogNormalize for piping from upsert source to sink
* Add information about DELETE to ChangelogMode
* Adapt FlinkChangelogModeInferenceProgram to remove ChangelogNormalize
if possible
---
.../flink/table/connector/ChangelogMode.java | 50 ++-
.../ChangelogNormalizeRequirementResolver.java | 179 +++++++++
.../StreamNonDeterministicUpdatePlanVisitor.java | 21 +-
.../flink/table/planner/plan/trait/DeleteKind.java | 35 ++
.../table/planner/plan/utils/FlinkRelUtil.java | 19 +
.../FlinkChangelogModeInferenceProgram.scala | 419 +++++++++++++++++++--
.../table/planner/plan/trait/DeleteKindTrait.scala | 100 +++++
.../planner/plan/trait/DeleteKindTraitDef.scala | 50 +++
.../planner/plan/utils/ChangelogPlanUtils.scala | 18 +-
.../planner/factories/TestValuesTableFactory.java | 50 ++-
.../nodes/exec/stream/DeletesByKeyPrograms.java | 307 +++++++++++++++
.../exec/stream/DeletesByKeySemanticTests.java | 39 ++
.../sql/ChangelogNormalizeOptimizationTest.java | 301 +++++++++++++++
.../runtime/stream/sql/DataStreamJavaITCase.java | 4 +-
.../sql/ChangelogNormalizeOptimizationTest.xml | 418 ++++++++++++++++++++
.../planner/plan/stream/sql/TableScanTest.xml | 14 +-
.../plan/stream/sql/NonDeterministicDagTest.scala | 4 +-
.../planner/plan/stream/sql/TableScanTest.scala | 9 +-
.../flink/table/planner/utils/TableTestBase.scala | 11 +-
19 files changed, 1969 insertions(+), 79 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
index 9d4a5fd500f..467d374d183 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* The set of changes contained in a changelog.
@@ -43,6 +44,15 @@ public final class ChangelogMode {
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
+ .keyOnlyDeletes(true)
+ .build();
+
+ private static final ChangelogMode UPSERT_WITH_FULL_DELETES =
+ ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .keyOnlyDeletes(false)
.build();
private static final ChangelogMode ALL =
@@ -54,11 +64,13 @@ public final class ChangelogMode {
.build();
private final Set<RowKind> kinds;
+ private final boolean keyOnlyDeletes;
- private ChangelogMode(Set<RowKind> kinds) {
+ private ChangelogMode(Set<RowKind> kinds, boolean keyOnlyDeletes) {
Preconditions.checkArgument(
kinds.size() > 0, "At least one kind of row should be
contained in a changelog.");
this.kinds = Collections.unmodifiableSet(kinds);
+ this.keyOnlyDeletes = keyOnlyDeletes;
}
/** Shortcut for a simple {@link RowKind#INSERT}-only changelog. */
@@ -71,7 +83,21 @@ public final class ChangelogMode {
* contain {@link RowKind#UPDATE_BEFORE} rows.
*/
public static ChangelogMode upsert() {
- return UPSERT;
+ return upsert(true);
+ }
+
+ /**
+ * Shortcut for an upsert changelog that describes idempotent updates on a
key and thus does not
+ * contain {@link RowKind#UPDATE_BEFORE} rows.
+ *
+ * @param keyOnlyDeletes Tells the system the DELETEs contain just the key.
+ */
+ public static ChangelogMode upsert(boolean keyOnlyDeletes) {
+ if (keyOnlyDeletes) {
+ return UPSERT;
+ } else {
+ return UPSERT_WITH_FULL_DELETES;
+ }
}
/** Shortcut for a changelog that can contain all {@link RowKind}s. */
@@ -96,6 +122,10 @@ public final class ChangelogMode {
return kinds.size() == 1 && kinds.contains(kind);
}
+ public boolean keyOnlyDeletes() {
+ return keyOnlyDeletes;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -115,7 +145,13 @@ public final class ChangelogMode {
@Override
public String toString() {
- return kinds.toString();
+ if (!keyOnlyDeletes) {
+ return kinds.toString();
+ } else {
+ return kinds.stream()
+ .map(kind -> kind == RowKind.DELETE ? "~D" :
kind.toString())
+ .collect(Collectors.joining(", ", "[", "]"));
+ }
}
//
--------------------------------------------------------------------------------------------
@@ -125,6 +161,7 @@ public final class ChangelogMode {
public static class Builder {
private final Set<RowKind> kinds = EnumSet.noneOf(RowKind.class);
+ private boolean keyOnlyDeletes = false;
private Builder() {
// default constructor to allow a fluent definition
@@ -135,8 +172,13 @@ public final class ChangelogMode {
return this;
}
+ public Builder keyOnlyDeletes(boolean flag) {
+ this.keyOnlyDeletes = flag;
+ return this;
+ }
+
public ChangelogMode build() {
- return new ChangelogMode(kinds);
+ return new ChangelogMode(kinds, keyOnlyDeletes);
}
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
new file mode 100644
index 00000000000..2191fd51940
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.catalog.Column;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Checks if it is safe to remove ChangelogNormalize as part of {@link
+ * FlinkChangelogModeInferenceProgram}. It checks:
+ *
+ * <ul>
+ * <li>if there is no filter pushed into the changelog normalize
+ * <li>if we don't need to produce UPDATE_BEFORE
+ * <li>we don't access any metadata columns
+ * </ul>
+ */
+public class ChangelogNormalizeRequirementResolver {
+
+ /** Checks if it is safe to remove ChangelogNormalize. */
+ public static boolean isRequired(StreamPhysicalChangelogNormalize
normalize) {
+ if (normalize.filterCondition() != null) {
+ return true;
+ }
+ if (!Objects.equals(
+
normalize.getTraitSet().getTrait(UpdateKindTraitDef$.MODULE$.INSTANCE()),
+ UpdateKindTrait$.MODULE$.ONLY_UPDATE_AFTER())) {
+ return true;
+ }
+
+ // check if metadata columns are accessed
+ final RelNode input = normalize.getInput();
+
+ return visit(input, bitSetForAllOutputColumns(input));
+ }
+
+ private static ImmutableBitSet bitSetForAllOutputColumns(RelNode input) {
+ return ImmutableBitSet.builder().set(0,
input.getRowType().getFieldCount()).build();
+ }
+
+ private static boolean visit(final RelNode rel, final ImmutableBitSet
requiredColumns) {
+ if (rel instanceof StreamPhysicalCalcBase) {
+ return visitCalc((StreamPhysicalCalcBase) rel, requiredColumns);
+ } else if (rel instanceof StreamPhysicalTableSourceScan) {
+ return visitTableSourceScan((StreamPhysicalTableSourceScan) rel,
requiredColumns);
+ } else if (rel instanceof StreamPhysicalWatermarkAssigner
+ || rel instanceof StreamPhysicalExchange) {
+ // require all input columns
+ final RelNode input = ((SingleRel) rel).getInput();
+ return visit(input, bitSetForAllOutputColumns(input));
+ } else {
+ // these nodes should not be in an input of a changelog normalize
+ // StreamPhysicalChangelogNormalize
+ // StreamPhysicalDropUpdateBefore
+ // StreamPhysicalUnion
+ // StreamPhysicalSort
+ // StreamPhysicalLimit
+ // StreamPhysicalSortLimit
+ // StreamPhysicalTemporalSort
+ // StreamPhysicalWindowTableFunction
+ // StreamPhysicalWindowRank
+ // StreamPhysicalWindowDeduplicate
+ // StreamPhysicalRank
+ // StreamPhysicalOverAggregateBase
+ // CommonPhysicalJoin
+ // StreamPhysicalMatch
+ // StreamPhysicalMiniBatchAssigner
+ // StreamPhysicalExpand
+ // StreamPhysicalWindowAggregateBase
+ // StreamPhysicalGroupAggregateBase
+ // StreamPhysicalSink
+ // StreamPhysicalLegacySink
+ // StreamPhysicalCorrelateBase
+ // StreamPhysicalLookupJoin
+ // StreamPhysicalValues
+ // StreamPhysicalDataStreamScan
+ // StreamPhysicalLegacyTableSourceScan
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported to visit node %s. The node either
should not be pushed"
+ + " through the changelog normalize or is
not supported yet.",
+ rel.getClass().getSimpleName()));
+ }
+ }
+
+ private static boolean visitTableSourceScan(
+ StreamPhysicalTableSourceScan tableScan, ImmutableBitSet
requiredColumns) {
+ if (!(tableScan.tableSource() instanceof SupportsReadingMetadata)) {
+ // source does not have metadata, no need to check
+ return false;
+ }
+ final TableSourceTable sourceTable =
tableScan.getTable().unwrap(TableSourceTable.class);
+ assert sourceTable != null;
+ // check if requiredColumns contain metadata column
+ final List<Column.MetadataColumn> metadataColumns =
+ DynamicSourceUtils.extractMetadataColumns(
+
sourceTable.contextResolvedTable().getResolvedSchema());
+ final Set<String> metaColumnSet =
+
metadataColumns.stream().map(Column::getName).collect(Collectors.toSet());
+ final List<String> columns = tableScan.getRowType().getFieldNames();
+ for (int index = 0; index < columns.size(); index++) {
+ String column = columns.get(index);
+ if (metaColumnSet.contains(column) && requiredColumns.get(index)) {
+ // we require metadata column, therefore, we cannot remove the
changelog normalize
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static boolean visitCalc(StreamPhysicalCalcBase calc,
ImmutableBitSet requiredColumns) {
+ // evaluate required columns from input
+ final List<RexNode> projects =
+ calc.getProgram().getProjectList().stream()
+ .map(expr -> calc.getProgram().expandLocalRef(expr))
+ .collect(Collectors.toList());
+ final Map<Integer, List<Integer>> outFromSourcePos =
+ FlinkRelUtil.extractSourceMapping(projects);
+ final List<Integer> conv2Inputs =
+ requiredColumns.toList().stream()
+ .map(
+ out ->
+
Optional.ofNullable(outFromSourcePos.get(out))
+ .orElseThrow(
+ () ->
+ new
IllegalStateException(
+
String.format(
+
"Invalid pos:%d over projection:%s",
+
out,
+
calc
+
.getProgram()))))
+ .flatMap(Collection::stream)
+ .filter(index -> index != -1)
+ .distinct()
+ .collect(Collectors.toList());
+ return visit(calc.getInput(), ImmutableBitSet.of(conv2Inputs));
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index 98a1a6782cf..aaa4c4ff845 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -62,6 +62,7 @@ import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalW
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
@@ -78,7 +79,6 @@ import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.rex.RexSlot;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.util.ImmutableBitSet;
@@ -297,7 +297,8 @@ public class StreamNonDeterministicUpdatePlanVisitor {
calc.getProgram().getProjectList().stream()
.map(expr ->
calc.getProgram().expandLocalRef(expr))
.collect(Collectors.toList());
- Map<Integer, List<Integer>> outFromSourcePos =
extractSourceMapping(projects);
+ Map<Integer, List<Integer>> outFromSourcePos =
+ FlinkRelUtil.extractSourceMapping(projects);
List<Integer> conv2Inputs =
requireDeterminism.toList().stream()
.map(
@@ -812,22 +813,6 @@ public class StreamNonDeterministicUpdatePlanVisitor {
return transmitDeterminismRequirement(rel, inputRequireDeterminism);
}
- /** Extracts the out from source field index mapping of the given
projects. */
- private Map<Integer, List<Integer>> extractSourceMapping(final
List<RexNode> projects) {
- Map<Integer, List<Integer>> mapOutFromInPos = new HashMap<>();
-
- for (int index = 0; index < projects.size(); index++) {
- RexNode expr = projects.get(index);
- mapOutFromInPos.put(
- index,
- FlinkRexUtil.findAllInputRefs(expr).stream()
- .mapToInt(RexSlot::getIndex)
- .boxed()
- .collect(Collectors.toList()));
- }
- return mapOutFromInPos;
- }
-
private void checkNonDeterministicRexProgram(
final ImmutableBitSet requireDeterminism,
final RexProgram program,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
new file mode 100644
index 00000000000..8f864f7ac48
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.trait;
+
+/** Lists all kinds of {@link ModifyKind#DELETE} operation. */
+public enum DeleteKind {
+
+ /** This kind indicates that operators do not emit {@link
ModifyKind#DELETE} operation. */
+ NONE,
+
+ /**
+ * This kind indicates that operators can emit deletes with the key only.
The rest of the row
+ * may be not present.
+ */
+ DELETE_BY_KEY,
+
+ /** This kind indicates that operators should emit deletes with the full
row. */
+ FULL_DELETE
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
index a920a6133ff..ee39cdb807b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
@@ -28,12 +28,15 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexSlot;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVisitorImpl;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/** Utilities for {@link RelNode}. */
@@ -170,6 +173,22 @@ public class FlinkRelUtil {
return array;
}
+ /** Extracts the out from source field index mapping of the given
projects. */
+ public static Map<Integer, List<Integer>> extractSourceMapping(final
List<RexNode> projects) {
+ Map<Integer, List<Integer>> mapOutFromInPos = new HashMap<>();
+
+ for (int index = 0; index < projects.size(); index++) {
+ RexNode expr = projects.get(index);
+ mapOutFromInPos.put(
+ index,
+ FlinkRexUtil.findAllInputRefs(expr).stream()
+ .mapToInt(RexSlot::getIndex)
+ .boxed()
+ .collect(Collectors.toList()));
+ }
+ return mapOutFromInPos;
+ }
+
/**
* An InputRefCounter that count every inputRef's reference count number,
every reference will
* be counted, e.g., '$0 + 1' & '$0 + 2' will count 2 instead of 1.
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 620cd5802ab..988bcbc3554 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -23,9 +23,11 @@ import
org.apache.flink.table.api.config.ExecutionConfigOptions
import
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.planner.plan.`trait`._
+import
org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone,
fullDeleteOrNone, DELETE_BY_KEY, FULL_DELETE}
import
org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone,
onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.stream._
+import
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver
import org.apache.flink.table.planner.plan.utils._
import
org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy,
RetractStrategy, UpdateFastStrategy}
import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -70,12 +72,33 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context)
- val finalRoot = requiredUpdateKindTraits.flatMap {
+ val updateRoot = requiredUpdateKindTraits.flatMap {
requiredUpdateKindTrait =>
updateKindTraitVisitor.visit(rootWithModifyKindSet,
requiredUpdateKindTrait)
}
- // step3: sanity check and return non-empty root
+ // step3: satisfy DeleteKind trait
+ val requiredDeleteKindTraits = if
(rootModifyKindSet.contains(ModifyKind.DELETE)) {
+ root match {
+ case _: StreamPhysicalSink =>
+ // try DELETE_BY_KEY first, and then FULL_DELETE
+ Seq(DeleteKindTrait.DELETE_BY_KEY, DeleteKindTrait.FULL_DELETE)
+ case _ =>
+ // for non-sink nodes prefer full deletes
+ Seq(DeleteKindTrait.FULL_DELETE)
+ }
+ } else {
+ // there is no deletes
+ Seq(DeleteKindTrait.NONE)
+ }
+
+ val deleteKindTraitVisitor = new SatisfyDeleteKindTraitVisitor(context)
+ val finalRoot = requiredDeleteKindTraits.flatMap {
+ requiredDeleteKindTrait =>
+ deleteKindTraitVisitor.visit(updateRoot.head, requiredDeleteKindTrait)
+ }
+
+ // step4: sanity check and return non-empty root
if (finalRoot.isEmpty) {
val plan = FlinkRelOptUtil.toString(root, withChangelogTraits = true)
throw new TableException(
@@ -489,13 +512,15 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
*
* @param rel
* the node who should satisfy the requiredTrait
- * @param requiredTrait
+ * @param requiredUpdateTrait
* the required UpdateKindTrait
* @return
* A converted node which satisfies required traits by input nodes of
current node. Or None if
* required traits cannot be satisfied.
*/
- def visit(rel: StreamPhysicalRel, requiredTrait: UpdateKindTrait):
Option[StreamPhysicalRel] =
+ def visit(
+ rel: StreamPhysicalRel,
+ requiredUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] =
rel match {
case sink: StreamPhysicalSink =>
val sinkRequiredTraits = inferSinkRequiredTraits(sink)
@@ -534,10 +559,10 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
_: StreamPhysicalWindowAggregate =>
// Aggregate, TableAggregate, Limit, GroupWindowAggregate,
WindowAggregate,
// and WindowTableAggregate requires update_before if there are
updates
- val requiredChildTrait =
beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
- val children = visitChildren(rel, requiredChildTrait)
+ val requiredChildUpdateTrait =
beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
+ val children = visitChildren(rel, requiredChildUpdateTrait)
// use requiredTrait as providedTrait, because they should support
all kinds of UpdateKind
- createNewNode(rel, children, requiredTrait)
+ createNewNode(rel, children, requiredUpdateTrait)
case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
@@ -546,14 +571,14 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP,
OverAggregate,
// and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
- createNewNode(rel, children, requiredTrait)
+ createNewNode(rel, children, requiredUpdateTrait)
case rank: StreamPhysicalRank =>
val rankStrategies =
RankProcessStrategy.analyzeRankProcessStrategies(rank,
rank.partitionKey, rank.orderKey)
visitRankStrategies(
rankStrategies,
- requiredTrait,
+ requiredUpdateTrait,
rankStrategy => rank.copy(rankStrategy))
case sortLimit: StreamPhysicalSortLimit =>
@@ -563,16 +588,16 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
sortLimit.getCollation)
visitRankStrategies(
rankStrategies,
- requiredTrait,
+ requiredUpdateTrait,
rankStrategy => sortLimit.copy(rankStrategy))
case sort: StreamPhysicalSort =>
val requiredChildTrait =
beforeAfterOrNone(getModifyKindSet(sort.getInput))
val children = visitChildren(sort, requiredChildTrait)
- createNewNode(sort, children, requiredTrait)
+ createNewNode(sort, children, requiredUpdateTrait)
case join: StreamPhysicalJoin =>
- val onlyAfterByParent = requiredTrait.updateKind ==
UpdateKind.ONLY_UPDATE_AFTER
+ val onlyAfterByParent = requiredUpdateTrait.updateKind ==
UpdateKind.ONLY_UPDATE_AFTER
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
@@ -592,7 +617,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
if (children.exists(_.isEmpty)) {
None
} else {
- createNewNode(join, Some(children.flatten.toList), requiredTrait)
+ createNewNode(join, Some(children.flatten.toList),
requiredUpdateTrait)
}
case temporalJoin: StreamPhysicalTemporalJoin =>
@@ -601,7 +626,8 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
// the left input required trait depends on it's parent in temporal
join
// the left input will send message to parent
- val requiredUpdateBeforeByParent = requiredTrait.updateKind ==
UpdateKind.BEFORE_AND_AFTER
+ val requiredUpdateBeforeByParent =
+ requiredUpdateTrait.updateKind == UpdateKind.BEFORE_AND_AFTER
val leftInputModifyKindSet = getModifyKindSet(left)
val leftRequiredTrait = if (requiredUpdateBeforeByParent) {
beforeAfterOrNone(leftInputModifyKindSet)
@@ -630,7 +656,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case calc: StreamPhysicalCalcBase =>
if (
- requiredTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
+ requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
calc.getProgram.getCondition != null
) {
// we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind,
@@ -639,7 +665,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
None
} else {
// otherwise, forward UpdateKind requirement
- visitChildren(rel, requiredTrait) match {
+ visitChildren(rel, requiredUpdateTrait) match {
case None => None
case Some(children) =>
val childTrait =
children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
@@ -652,7 +678,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
_: StreamPhysicalMiniBatchAssigner | _:
StreamPhysicalWatermarkAssigner |
_: StreamPhysicalWindowTableFunction =>
// transparent forward requiredTrait to children
- visitChildren(rel, requiredTrait) match {
+ visitChildren(rel, requiredUpdateTrait) match {
case None => None
case Some(children) =>
val childTrait =
children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
@@ -666,7 +692,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
UpdateKindTrait.NONE
} else {
- requiredTrait
+ requiredUpdateTrait
}
this.visit(child, requiredChildTrait)
}.toList
@@ -704,7 +730,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val children = visitChildren(normalize,
UpdateKindTrait.ONLY_UPDATE_AFTER)
// use requiredTrait as providedTrait,
// because changelog normalize supports all kinds of UpdateKind
- createNewNode(rel, children, requiredTrait)
+ createNewNode(rel, children, requiredUpdateTrait)
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
@@ -712,11 +738,11 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val newSource = createNewNode(rel, Some(List()), providedTrait)
if (
providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
- requiredTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
+ requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
) {
// requiring only-after, but the source is CDC source, then drop
update_before manually
val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster,
rel.getTraitSet, rel)
- createNewNode(dropUB, newSource.map(s => List(s)), requiredTrait)
+ createNewNode(dropUB, newSource.map(s => List(s)),
requiredUpdateTrait)
} else {
newSource
}
@@ -730,9 +756,9 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
// we can't drop UPDATE_BEFORE if it is required by other parent
blocks
UpdateKindTrait.BEFORE_AND_AFTER
} else {
- requiredTrait
+ requiredUpdateTrait
}
- if (!providedTrait.satisfies(requiredTrait)) {
+ if (!providedTrait.satisfies(requiredUpdateTrait)) {
// require ONLY_AFTER but can only provide BEFORE_AND_AFTER
None
} else {
@@ -744,12 +770,13 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val children = process.getInputs.map {
case child: StreamPhysicalRel =>
val childModifyKindSet = getModifyKindSet(child)
- val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
- UpdateKindTrait.NONE
- } else {
- UpdateKindTrait.BEFORE_AND_AFTER
- }
- this.visit(child, requiredChildTrait)
+ val requiredUpdateChildTrait =
+ if (childModifyKindSet.isInsertOnly) {
+ UpdateKindTrait.NONE
+ } else {
+ UpdateKindTrait.BEFORE_AND_AFTER
+ }
+ this.visit(child, requiredUpdateChildTrait)
}.toList
createNewNode(rel, Some(children.flatten), UpdateKindTrait.NONE)
@@ -761,15 +788,15 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
private def visitChildren(
parent: StreamPhysicalRel,
- requiredChildrenTrait: UpdateKindTrait):
Option[List[StreamPhysicalRel]] = {
+ requiredChildrenUpdateTrait: UpdateKindTrait):
Option[List[StreamPhysicalRel]] = {
val newChildren = for (child <- parent.getInputs) yield {
- this.visit(child.asInstanceOf[StreamPhysicalRel],
requiredChildrenTrait) match {
+ this.visit(child.asInstanceOf[StreamPhysicalRel],
requiredChildrenUpdateTrait) match {
case None =>
// return None if one of the children can't satisfy
return None
case Some(newChild) =>
- val providedTrait =
newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
- if (!providedTrait.satisfies(requiredChildrenTrait)) {
+ val providedUpdateTrait =
newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
+ if (!providedUpdateTrait.satisfies(requiredChildrenUpdateTrait)) {
// the provided trait can't satisfy required trait, thus we
should return None.
return None
}
@@ -782,13 +809,13 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
private def createNewNode(
node: StreamPhysicalRel,
childrenOption: Option[List[StreamPhysicalRel]],
- providedTrait: UpdateKindTrait): Option[StreamPhysicalRel] =
childrenOption match {
+ providedUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] =
childrenOption match {
case None =>
None
case Some(children) =>
val modifyKindSetTrait =
node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
val nodeDescription = node.getRelDetailedDescription
- val isUpdateKindValid = providedTrait.updateKind match {
+ val isUpdateKindValid = providedUpdateTrait.updateKind match {
case UpdateKind.NONE =>
!modifyKindSetTrait.modifyKindSet.contains(ModifyKind.UPDATE)
case UpdateKind.BEFORE_AND_AFTER | UpdateKind.ONLY_UPDATE_AFTER =>
@@ -796,17 +823,19 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
if (!isUpdateKindValid) {
throw new TableException(
- s"UpdateKindTrait $providedTrait conflicts with " +
+ s"UpdateKindTrait $providedUpdateTrait conflicts with " +
s"ModifyKindSetTrait $modifyKindSetTrait. " +
s"This is a bug in planner, please file an issue. \n" +
s"Current node is $nodeDescription.")
}
- val newTraitSet = node.getTraitSet.plus(providedTrait)
+
+ val newTraitSet = node.getTraitSet.plus(providedUpdateTrait)
Some(node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel])
}
/**
* Try all possible rank strategies and return the first viable new node.
+ *
* @param rankStrategies
* all possible supported rank strategy by current node
* @param requiredUpdateKindTrait
@@ -932,10 +961,324 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
}
- //
-------------------------------------------------------------------------------------------
+ /**
+ * A visitor which will try to satisfy the required [[DeleteKindTrait]] from
root.
+ *
+ * <p>After traversed by this visitor, every node should have a correct
[[DeleteKindTrait]] or
+ * returns None if the planner doesn't support to satisfy the required
[[DeleteKindTrait]].
+ */
+ private class SatisfyDeleteKindTraitVisitor(private val context:
StreamOptimizeContext) {
+
+ /**
+ * Try to satisfy the required [[DeleteKindTrait]] from root.
+ *
+ * <p>Each node will first require a DeleteKindTrait to its children. The
required
+ * DeleteKindTrait may come from the node's parent, or come from the node
itself, depending on
+ * whether the node will destroy the trait provided by children or pass
the trait from children.
+ *
+ * <p>If the node will pass the children's DeleteKindTrait without
destroying it, then return a
+ * new node with new inputs and forwarded DeleteKindTrait.
+ *
+ * <p>If the node will destroy the children's UpdateKindTrait, then the
node itself needs to be
+ * converted, or a new node should be generated to satisfy the required
trait, such as marking
+ * itself not to generate UPDATE_BEFORE, or generating a new node to
filter UPDATE_BEFORE.
+ *
+ * @param rel
+ * the node who should satisfy the requiredTrait
+ * @param requiredTrait
+ * the required DeleteKindTrait
+ * @return
+ * A converted node which satisfies required traits by input nodes of
current node. Or None if
+ * required traits cannot be satisfied.
+ */
+ def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait):
Option[StreamPhysicalRel] =
+ rel match {
+ case sink: StreamPhysicalSink =>
+ val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+ visitSink(sink, sinkRequiredTraits)
+
+ case sink: StreamPhysicalLegacySink[_] =>
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val fullDelete = fullDeleteOrNone(childModifyKindSet)
+ visitSink(sink, Seq(fullDelete))
+
+ case _: StreamPhysicalGroupAggregate | _:
StreamPhysicalGroupTableAggregate |
+ _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
+ _: StreamPhysicalPythonGroupTableAggregate | _:
StreamPhysicalGroupWindowAggregateBase |
+ _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _:
StreamPhysicalRank |
+ _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
+ _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
+ _: StreamPhysicalWatermarkAssigner | _:
StreamPhysicalWindowTableFunction |
+ _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
+ _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
+ _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
+ _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin
|
+ _: StreamPhysicalProcessTableFunction =>
+ // if not explicitly supported, all operators require full deletes
if there are updates
+ // ProcessTableFunction currently only consumes full deletes or
insert-only
+ val children = rel.getInputs.map {
+ case child: StreamPhysicalRel =>
+ val childModifyKindSet = getModifyKindSet(child)
+ this.visit(child, fullDeleteOrNone(childModifyKindSet))
+ }.toList
+ createNewNode(rel, Some(children.flatten),
fullDeleteOrNone(getModifyKindSet(rel)))
+
+ case join: StreamPhysicalJoin =>
+ val children = join.getInputs.zipWithIndex.map {
+ case (child, childOrdinal) =>
+ val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+ val supportsDeleteByKey =
join.inputUniqueKeyContainsJoinKey(childOrdinal)
+ val inputModifyKindSet = getModifyKindSet(physicalChild)
+ if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
+ this
+ .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
+ .orElse(this.visit(physicalChild,
fullDeleteOrNone(inputModifyKindSet)))
+ } else {
+ this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
+ }
+ }
+ if (children.exists(_.isEmpty)) {
+ None
+ } else {
+ val childRels = children.flatten.toList
+ if (childRels.exists(r => getDeleteKind(r) ==
DeleteKind.DELETE_BY_KEY)) {
+ createNewNode(join, Some(childRels),
deleteOnKeyOrNone(getModifyKindSet(rel)))
+ } else {
+ createNewNode(join, Some(childRels),
fullDeleteOrNone(getModifyKindSet(rel)))
+ }
+ }
+
+ case calc: StreamPhysicalCalcBase =>
+ if (
+ requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
+ calc.getProgram.getCondition != null
+ ) {
+ // this can be further improved by checking if the filter
condition is on the key
+ None
+ } else {
+ // otherwise, forward DeleteKind requirement
+ visitChildren(rel, requiredTrait) match {
+ case None => None
+ case Some(children) =>
+ val childTrait =
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+ createNewNode(rel, Some(children), childTrait)
+ }
+ }
+
+ case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
+ _: StreamPhysicalMiniBatchAssigner | _:
StreamPhysicalDropUpdateBefore =>
+ // transparent forward requiredTrait to children
+ visitChildren(rel, requiredTrait) match {
+ case None => None
+ case Some(children) =>
+ val childTrait =
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+ createNewNode(rel, Some(children), childTrait)
+ }
+
+ case union: StreamPhysicalUnion =>
+ val children = union.getInputs.map {
+ case child: StreamPhysicalRel =>
+ val childModifyKindSet = getModifyKindSet(child)
+ val requiredChildTrait = if
(!childModifyKindSet.contains(ModifyKind.DELETE)) {
+ DeleteKindTrait.NONE
+ } else {
+ requiredTrait
+ }
+ this.visit(child, requiredChildTrait)
+ }.toList
+
+ if (children.exists(_.isEmpty)) {
+ None
+ } else {
+ val deleteKinds = children.flatten
+ .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
+ // union can just forward changes, can't actively satisfy to
another changelog mode
+ val providedTrait = if (deleteKinds.forall(k =>
DeleteKindTrait.NONE == k)) {
+ // if all the children is NONE, union is NONE
+ DeleteKindTrait.NONE
+ } else {
+ // otherwise, merge update kinds.
+ val merged = deleteKinds
+ .map(_.deleteKind)
+ .reduce {
+ (l, r) =>
+ (l, r) match {
+ case (DeleteKind.NONE, r: DeleteKind) => r
+ case (l: DeleteKind, DeleteKind.NONE) => l
+ case (l: DeleteKind, r: DeleteKind) =>
+ if (l == r) {
+ l
+ } else {
+ // if any of the union input produces DELETE_BY_KEY,
the union produces
+ // delete by key
+ DeleteKind.DELETE_BY_KEY
+ }
+ }
+ }
+ new DeleteKindTrait(merged)
+ }
+ createNewNode(union, Some(children.flatten), providedTrait)
+ }
+
+ case normalize: StreamPhysicalChangelogNormalize =>
+ // if
+ // 1. we don't need to produce UPDATE_BEFORE,
+ // 2. children can satisfy the required delete trait,
+ // 3. the normalize doesn't have filter condition which we'd lose,
+ // 4. we don't use metadata columns
+ // we can skip ChangelogNormalize
+ if (!ChangelogNormalizeRequirementResolver.isRequired(normalize)) {
+ visitChildren(normalize, requiredTrait) match {
+ case Some(children) =>
+ val input = children.head match {
+ case exchange: StreamPhysicalExchange =>
+ exchange.getInput
+ case _ =>
+ normalize.getInput
+ }
+ return Some(input.asInstanceOf[StreamPhysicalRel])
+ case _ =>
+ }
+ }
+ val childModifyKindTrait = getModifyKindSet(rel.getInput(0))
+
+ // prefer delete by key, but accept both
+ val children = visitChildren(normalize,
deleteOnKeyOrNone(childModifyKindTrait))
+ .orElse(visitChildren(normalize,
fullDeleteOrNone(childModifyKindTrait)))
+
+ // changelog normalize produces full deletes
+ createNewNode(rel, children, fullDeleteOrNone(getModifyKindSet(rel)))
+
+ case ts: StreamPhysicalTableSourceScan =>
+ // currently only support BEFORE_AND_AFTER if source produces updates
+ val providedTrait =
DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
+ createNewNode(rel, Some(List()), providedTrait)
+
+ case _: StreamPhysicalDataStreamScan | _:
StreamPhysicalLegacyTableSourceScan |
+ _: StreamPhysicalValues =>
+ createNewNode(rel, Some(List()), DeleteKindTrait.NONE)
+
+ case _: StreamPhysicalIntermediateTableScan =>
+ createNewNode(rel, Some(List()),
fullDeleteOrNone(getModifyKindSet(rel)))
+
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Unsupported visit for ${rel.getClass.getSimpleName}")
+
+ }
+
+ private def visitChildren(
+ parent: StreamPhysicalRel,
+ requiredChildrenTrait: DeleteKindTrait):
Option[List[StreamPhysicalRel]] = {
+ val newChildren = for (child <- parent.getInputs) yield {
+ this.visit(child.asInstanceOf[StreamPhysicalRel],
requiredChildrenTrait) match {
+ case None =>
+ // return None if one of the children can't satisfy
+ return None
+ case Some(newChild) =>
+ val providedTrait =
newChild.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+ if (!providedTrait.satisfies(requiredChildrenTrait)) {
+ // the provided trait can't satisfy required trait, thus we
should return None.
+ return None
+ }
+ newChild
+ }
+ }
+ Some(newChildren.toList)
+ }
+
+ private def createNewNode(
+ node: StreamPhysicalRel,
+ childrenOption: Option[List[StreamPhysicalRel]],
+ providedDeleteTrait: DeleteKindTrait): Option[StreamPhysicalRel] =
childrenOption match {
+ case None =>
+ None
+ case Some(children) =>
+ val modifyKindSetTrait =
node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
+ val nodeDescription = node.getRelDetailedDescription
+ val isDeleteKindValid = providedDeleteTrait.deleteKind match {
+ case DeleteKind.NONE =>
+ !modifyKindSetTrait.modifyKindSet.contains(ModifyKind.DELETE)
+ case DeleteKind.DELETE_BY_KEY | DeleteKind.FULL_DELETE =>
+ modifyKindSetTrait.modifyKindSet.contains(ModifyKind.DELETE)
+ }
+ if (!isDeleteKindValid) {
+ throw new TableException(
+ s"DeleteKindTrait $providedDeleteTrait conflicts with " +
+ s"ModifyKindSetTrait $modifyKindSetTrait. " +
+ s"This is a bug in planner, please file an issue. \n" +
+ s"Current node is $nodeDescription.")
+ }
+ val newTraitSet = node.getTraitSet.plus(providedDeleteTrait)
+ Some(node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel])
+ }
+
+ private def visitSink(
+ sink: StreamPhysicalRel,
+ sinkRequiredTraits: Seq[DeleteKindTrait]): Option[StreamPhysicalRel] =
{
+ val children = sinkRequiredTraits.flatMap(t => visitChildren(sink, t))
+ if (children.isEmpty) {
+ None
+ } else {
+ val sinkTrait = sink.getTraitSet.plus(DeleteKindTrait.NONE)
+ Some(sink.copy(sinkTrait,
children.head).asInstanceOf[StreamPhysicalRel])
+ }
+ }
+
+ /**
+ * Infer sink required traits by the sink node and its input. Sink
required traits is based on
+ * the sink node's changelog mode, the only exception is when sink's pk(s)
not exactly the same
+ * as the changeLogUpsertKeys and sink' changelog mode is DELETE_BY_KEY.
+ */
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink):
Seq[DeleteKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val sinkChangelogMode =
sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode)
+
+ val sinkDeleteTrait =
DeleteKindTrait.fromChangelogMode(sinkChangelogMode)
+
+ val fullDelete = fullDeleteOrNone(childModifyKindSet)
+ if (sinkDeleteTrait.equals(DeleteKindTrait.DELETE_BY_KEY)) {
+ if (areUpsertKeysDifferentFromPk(sink)) {
+ Seq(fullDelete)
+ } else {
+ Seq(sinkDeleteTrait, fullDelete)
+ }
+ } else {
+ Seq(fullDelete)
+ }
+ }
+
+ //
-------------------------------------------------------------------------------------------
+
+ private def areUpsertKeysDifferentFromPk(sink: StreamPhysicalSink) = {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys then
it will fallback
+ // to beforeAndAfter mode for the correctness
+ var upsertKeyDifferentFromPk: Boolean = false
+ val sinkDefinedPks =
sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+
+ if (sinkDefinedPks.nonEmpty) {
+ val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*)
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input is UA only, primary key != upsert key (upsert key can be
null) we should
+ // fallback to beforeAndAfter.
+ // Notice: even sink pk(s) contains input upsert key we cannot
optimize to UA only,
+ // this differs from batch job's unique key inference
+ if (changeLogUpsertKeys == null ||
!changeLogUpsertKeys.exists(_.equals(sinkPks))) {
+ upsertKeyDifferentFromPk = true
+ }
+ }
+ upsertKeyDifferentFromPk
+ }
+ }
private def getModifyKindSet(node: RelNode): ModifyKindSet = {
val modifyKindSetTrait =
node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
modifyKindSetTrait.modifyKindSet
}
+
+ private def getDeleteKind(node: RelNode): DeleteKind = {
+ val deleteKindTrait =
node.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+ deleteKindTrait.deleteKind
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
new file mode 100644
index 00000000000..b23063bd359
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.`trait`
+
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.types.RowKind
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
+
+/** DeleteKindTrait is used to describe the kind of delete operation. */
+class DeleteKindTrait(val deleteKind: DeleteKind) extends RelTrait {
+
+ override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
+ case other: DeleteKindTrait =>
+ // should totally match
+ other.deleteKind == this.deleteKind
+ case _ => false
+ }
+
+ override def getTraitDef: RelTraitDef[_ <: RelTrait] =
DeleteKindTraitDef.INSTANCE
+
+ override def register(planner: RelOptPlanner): Unit = {}
+
+ override def hashCode(): Int = deleteKind.hashCode()
+
+ override def equals(obj: Any): Boolean = obj match {
+ case t: DeleteKindTrait => this.deleteKind.equals(t.deleteKind)
+ case _ => false
+ }
+
+ override def toString: String = s"[${deleteKind.toString}]"
+}
+
+object DeleteKindTrait {
+
+ /** An [[DeleteKindTrait]] that indicates the node does not support delete
operation. */
+ val NONE = new DeleteKindTrait(DeleteKind.NONE)
+
+ /** An [[DeleteKindTrait]] that indicates the node supports deletes by key
only. */
+ val DELETE_BY_KEY = new DeleteKindTrait(DeleteKind.DELETE_BY_KEY)
+
+ /** An [[DeleteKindTrait]] that indicates the node produces requires deletes
by full records. */
+ val FULL_DELETE = new DeleteKindTrait(DeleteKind.FULL_DELETE)
+
+ /**
+ * Returns DELETE_BY_KEY [[DeleteKindTrait]] if there are delete changes.
Otherwise, returns NONE
+ * [[DeleteKindTrait]].
+ */
+ def deleteOnKeyOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = {
+ val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) {
+ DeleteKind.DELETE_BY_KEY
+ } else {
+ DeleteKind.NONE
+ }
+ new DeleteKindTrait(deleteKind)
+ }
+
+ /**
+ * Returns FULL_DELETE [[DeleteKindTrait]] if there are delete changes.
Otherwise, returns NONE
+ * [[DeleteKindTrait]].
+ */
+ def fullDeleteOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = {
+ val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) {
+ DeleteKind.FULL_DELETE
+ } else {
+ DeleteKind.NONE
+ }
+ new DeleteKindTrait(deleteKind)
+ }
+
+ /** Creates an instance of [[DeleteKindTrait]] from the given
[[ChangelogMode]]. */
+ def fromChangelogMode(changelogMode: ChangelogMode): DeleteKindTrait = {
+ val hasDelete = changelogMode.contains(RowKind.DELETE)
+ if (!hasDelete) {
+ DeleteKindTrait.NONE
+ } else {
+ val hasDeleteOnKey = changelogMode.keyOnlyDeletes()
+ if (hasDeleteOnKey) {
+ DeleteKindTrait.DELETE_BY_KEY
+ } else {
+ DeleteKindTrait.FULL_DELETE
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
new file mode 100644
index 00000000000..5d374c48bf1
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.`trait`
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+
+/** Definition of a trait containing [[DeleteKindTrait]]. */
+class DeleteKindTraitDef extends RelTraitDef[DeleteKindTrait] {
+
+ override def getTraitClass: Class[DeleteKindTrait] = classOf[DeleteKindTrait]
+
+ override def getSimpleName: String = this.getClass.getSimpleName
+
+ override def convert(
+ planner: RelOptPlanner,
+ rel: RelNode,
+ toTrait: DeleteKindTrait,
+ allowInfiniteCostConverters: Boolean): RelNode = {
+ rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
+ }
+
+ override def canConvert(
+ planner: RelOptPlanner,
+ fromTrait: DeleteKindTrait,
+ toTrait: DeleteKindTrait): Boolean = {
+ throw new UnsupportedOperationException("DeleteKindTrait conversion is not
supported for now.")
+ }
+
+ override def getDefault: DeleteKindTrait = DeleteKindTrait.FULL_DELETE
+}
+
+object DeleteKindTraitDef {
+ val INSTANCE = new DeleteKindTraitDef()
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
index 72eb3ecfc4d..ae90a5d6d7f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.connector.ChangelogMode
-import org.apache.flink.table.planner.plan.`trait`.{ModifyKind,
ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
+import org.apache.flink.table.planner.plan.`trait`.{DeleteKind,
DeleteKindTraitDef, ModifyKind, ModifyKindSetTraitDef, UpdateKind,
UpdateKindTraitDef}
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
import
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
import org.apache.flink.types.RowKind
@@ -85,6 +85,11 @@ object ChangelogPlanUtils {
val updateKind = node.getTraitSet
.getTrait(UpdateKindTraitDef.INSTANCE)
.updateKind
+ val deleteKind = Option(
+ node.getTraitSet
+ .getTrait(DeleteKindTraitDef.INSTANCE))
+ .map(_.deleteKind)
+ .getOrElse(DeleteKind.NONE)
if (modifyKindSet.isEmpty) {
None
@@ -102,6 +107,11 @@ object ChangelogPlanUtils {
modeBuilder.addContainedKind(RowKind.UPDATE_BEFORE)
}
}
+
+ if (deleteKind == DeleteKind.DELETE_BY_KEY) {
+ modeBuilder.keyOnlyDeletes(true)
+ }
+
Some(modeBuilder.build())
}
}
@@ -121,7 +131,11 @@ object ChangelogPlanUtils {
kinds += "UA"
}
if (mode.contains(RowKind.DELETE)) {
- kinds += "D"
+ if (mode.keyOnlyDeletes()) {
+ kinds += "PD"
+ } else {
+ kinds += "D"
+ }
}
kinds.mkString(",")
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 659e6b11209..f75cb778597 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -457,6 +457,22 @@ public final class TestValuesTableFactory
.withDescription(
"Option to determine whether or not to require the
distribution bucket count");
+ private static final ConfigOption<Boolean> SINK_SUPPORTS_DELETE_BY_KEY =
+ ConfigOptions.key("sink.supports-delete-by-key")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Option to determine whether or not to require
deletes to have the"
+ + " entire row or is a delete by key
sufficient.");
+
+ private static final ConfigOption<Boolean> SOURCE_PRODUCES_DELETE_BY_KEY =
+ ConfigOptions.key("source.produces-delete-by-key")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Option to determine whether or not to require
deletes to have the"
+ + " entire row or is a delete by key
sufficient.");
+
private static final ConfigOption<Integer> SOURCE_NUM_ELEMENT_TO_SKIP =
ConfigOptions.key("source.num-element-to-skip")
.intType()
@@ -504,7 +520,10 @@ public final class TestValuesTableFactory
helper.validate();
- ChangelogMode changelogMode =
parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
+ ChangelogMode changelogMode =
+ parseChangelogMode(
+ helper.getOptions().get(CHANGELOG_MODE),
+
helper.getOptions().get(SOURCE_PRODUCES_DELETE_BY_KEY));
String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
boolean isBounded = helper.getOptions().get(BOUNDED);
boolean isFinite = helper.getOptions().get(TERMINATING);
@@ -749,6 +768,7 @@ public final class TestValuesTableFactory
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
boolean requireBucketCount =
helper.getOptions().get(SINK_BUCKET_COUNT_REQUIRED);
+ boolean supportsDeleteByKey =
helper.getOptions().get(SINK_SUPPORTS_DELETE_BY_KEY);
if (sinkClass.equals("DEFAULT")) {
int rowTimeIndex =
validateAndExtractRowtimeIndex(
@@ -765,7 +785,8 @@ public final class TestValuesTableFactory
changelogMode,
rowTimeIndex,
tableSchema,
- requireBucketCount);
+ requireBucketCount,
+ supportsDeleteByKey);
} else {
try {
return InstantiationUtil.instantiate(
@@ -816,6 +837,8 @@ public final class TestValuesTableFactory
ENABLE_WATERMARK_PUSH_DOWN,
SINK_DROP_LATE_EVENT,
SINK_BUCKET_COUNT_REQUIRED,
+ SINK_SUPPORTS_DELETE_BY_KEY,
+ SOURCE_PRODUCES_DELETE_BY_KEY,
SOURCE_NUM_ELEMENT_TO_SKIP,
SOURCE_SLEEP_AFTER_ELEMENTS,
SOURCE_SLEEP_TIME,
@@ -916,6 +939,10 @@ public final class TestValuesTableFactory
}
private ChangelogMode parseChangelogMode(String string) {
+ return parseChangelogMode(string, false);
+ }
+
+ private ChangelogMode parseChangelogMode(String string, boolean
producesDeleteByKey) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (String split : string.split(",")) {
switch (split.trim()) {
@@ -935,6 +962,7 @@ public final class TestValuesTableFactory
throw new IllegalArgumentException("Invalid ChangelogMode
string: " + string);
}
}
+ builder.keyOnlyDeletes(producesDeleteByKey);
return builder.build();
}
@@ -1621,7 +1649,7 @@ public final class TestValuesTableFactory
implements SupportsWatermarkPushDown, SupportsSourceWatermark {
private final String tableName;
- private WatermarkStrategy<RowData> watermarkStrategy;
+ private WatermarkStrategy<RowData> watermarkStrategy =
WatermarkStrategy.noWatermarks();
private TestValuesScanTableSourceWithWatermarkPushDown(
DataType producedDataType,
@@ -2207,6 +2235,7 @@ public final class TestValuesTableFactory
private final int rowtimeIndex;
private final TableSchema tableSchema;
private final boolean requireBucketCount;
+ private final boolean supportsDeleteByKey;
private TestValuesTableSink(
DataType consumedDataType,
@@ -2220,7 +2249,8 @@ public final class TestValuesTableFactory
@Nullable ChangelogMode changelogModeEnforced,
int rowtimeIndex,
TableSchema tableSchema,
- boolean requireBucketCount) {
+ boolean requireBucketCount,
+ boolean supportsDeleteByKey) {
this.consumedDataType = consumedDataType;
this.primaryKeyIndices = primaryKeyIndices;
this.tableName = tableName;
@@ -2233,10 +2263,19 @@ public final class TestValuesTableFactory
this.rowtimeIndex = rowtimeIndex;
this.tableSchema = tableSchema;
this.requireBucketCount = requireBucketCount;
+ this.supportsDeleteByKey = supportsDeleteByKey;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ final ChangelogMode mode = getMode(requestedMode);
+ final ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ mode.getContainedKinds().forEach(builder::addContainedKind);
+ builder.keyOnlyDeletes(supportsDeleteByKey);
+ return builder.build();
+ }
+
+ private ChangelogMode getMode(ChangelogMode requestedMode) {
// if param [changelogModeEnforced] is passed in, return it
directly
if (changelogModeEnforced != null) {
return changelogModeEnforced;
@@ -2376,7 +2415,8 @@ public final class TestValuesTableFactory
changelogModeEnforced,
rowtimeIndex,
tableSchema,
- requireBucketCount);
+ requireBucketCount,
+ supportsDeleteByKey);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java
new file mode 100644
index 00000000000..6599f649950
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Tests for verifying semantic of operations when sources produce deletes by
key only and the sink
+ * can accept deletes by key only as well.
+ */
+public final class DeletesByKeyPrograms {
+
+ /**
+ * Tests a simple INSERT INTO SELECT scenario where ChangelogNormalize can
be eliminated since
+ * we don't need UPDATE_BEFORE, and we have key information for all
changes.
+ */
+ public static final TableTestProgram
INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY =
+ TableTestProgram.of(
+ "select-delete-on-key-to-delete-on-key",
+ "No ChangelogNormalize: validates results when
querying source with deletes by key"
+ + " only, writing to sink supporting
deletes by key only, which"
+ + " is a case where ChangelogNormalize can
be eliminated")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice", 10),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob", 20),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null, null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
2, "Bob", 30))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption(
+ "changelog-mode",
+ "I,UA,D") // Insert, UpdateAfter,
Delete
+ .addOption("sink.supports-delete-by-key",
"true")
+ .consumedValues(
+ "+I[1, Alice, 10]",
+ "+I[2, Bob, 20]",
+ "-D[1, null, null]",
+ "+U[2, Bob, 30]")
+ .build())
+ .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM
source_t")
+ .build();
+
+ public static final TableTestProgram
INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY_WITH_PROJECTION =
+ TableTestProgram.of(
+
"select-delete-on-key-to-delete-on-key-with-projection",
+ "No ChangelogNormalize: validates results when
querying source with deletes by key"
+ + " only, writing to sink supporting
deletes by key only with a"
+ + "projection, which is a case where
ChangelogNormalize can be"
+ + " eliminated")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING NOT NULL",
+ "`value` INT NOT NULL")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice", 10),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob", 20),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null, null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
2, "Bob", 30))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption(
+ "changelog-mode",
+ "I,UA,D") // Insert, UpdateAfter,
Delete
+ .addOption("sink.supports-delete-by-key",
"true")
+ .consumedValues(
+ "+I[1, Alice, 12]",
+ "+I[2, Bob, 22]",
+ "-D[1, , -1]",
+ "+U[2, Bob, 32]")
+ .build())
+ .runSql("INSERT INTO sink_t SELECT id, name, `value` + 2
FROM source_t")
+ .build();
+
+ public static final TableTestProgram
INSERT_SELECT_DELETE_BY_KEY_FULL_DELETE =
+ TableTestProgram.of(
+ "select-delete-on-key-to-full-delete",
+ "ChangelogNormalize: validates results when
querying source with deletes by key"
+ + " only, writing to sink supporting
requiring full deletes, "
+ + "which is a case where
ChangelogNormalize stays")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice", 10),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob", 20),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null, null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
2, "Bob", 30))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+ .addOption("sink.supports-delete-by-key",
"false")
+ .consumedValues(
+ "+I[1, Alice, 10]",
+ "+I[2, Bob, 20]",
+ "-D[1, Alice, 10]",
+ "+U[2, Bob, 30]")
+ .build())
+ .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM
source_t")
+ .build();
+
+ public static final TableTestProgram INSERT_SELECT_FULL_DELETE_FULL_DELETE
=
+ TableTestProgram.of(
+ "select-full-delete-to-full-delete",
+ "No ChangelogNormalize: validates results when
querying source with full deletes, "
+ + "writing to sink requiring full deletes,
which is a case"
+ + " where ChangelogNormalize can be
eliminated")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "false")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice", 10),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob", 20),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
"Alice", 10),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
2, "Bob", 30))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+ .addOption("sink.supports-delete-by-key",
"false")
+ .consumedValues(
+ "+I[1, Alice, 10]",
+ "+I[2, Bob, 20]",
+ "-D[1, Alice, 10]",
+ "+U[2, Bob, 30]")
+ .build())
+ .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM
source_t")
+ .build();
+
+ public static final TableTestProgram JOIN_INTO_FULL_DELETES =
+ TableTestProgram.of(
+ "join-to-full-delete",
+ "ChangelogNormalize: validates results when
joining sources with deletes by key"
+ + " only, writing to sink requiring full
deletes, which"
+ + " is a case where ChangelogNormalize
stays")
+ .setupTableSource(
+ SourceTestStep.newBuilder("left_t")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1, 10),
+ Row.ofKind(RowKind.INSERT, 2, 20),
+ Row.ofKind(RowKind.INSERT, 3, 30),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
3, 40))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("right_t")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "name STRING")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.INSERT, 3,
"Emily"),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
2, "BOB"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+ .addOption("sink.supports-delete-by-key",
"false")
+ .consumedValues(
+ "+I[1, Alice, 10]",
+ "+I[2, Bob, 20]",
+ "+I[3, Emily, 30]",
+ "-D[1, Alice, 10]",
+ "+U[3, Emily, 40]",
+ "+U[2, BOB, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT l.id, r.name, l.`value`
FROM left_t l JOIN right_t r ON l.id = r.id")
+ .build();
+
+ public static final TableTestProgram JOIN_INTO_DELETES_BY_KEY =
+ TableTestProgram.of(
+ "join-to-delete-on-key",
+ "No ChangelogNormalize: validates results when
joining sources with deletes by key"
+ + " only, writing to sink supporting
deletes by key, which"
+ + " is a case where ChangelogNormalize can
be removed")
+ .setupTableSource(
+ SourceTestStep.newBuilder("left_t")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1, 10),
+ Row.ofKind(RowKind.INSERT, 2, 20),
+ Row.ofKind(RowKind.INSERT, 3, 30),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
3, 40))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("right_t")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "name STRING")
+ .addOption("changelog-mode", "I,UA,D")
+
.addOption("source.produces-delete-by-key", "true")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.INSERT, 3,
"Emily"),
+ // Delete by key
+ Row.ofKind(RowKind.DELETE, 1,
null),
+ // Update after only
+ Row.ofKind(RowKind.UPDATE_AFTER,
2, "BOB"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "id INT PRIMARY KEY NOT ENFORCED",
+ "name STRING",
+ "`value` INT")
+ .addOption("changelog-mode", "I,UA,D")
+ .addOption("sink.supports-delete-by-key",
"true")
+ .consumedValues(
+ "+I[1, Alice, 10]",
+ "+I[2, Bob, 20]",
+ "+I[3, Emily, 30]",
+ "-D[1, Alice, null]",
+ "+U[3, Emily, 40]",
+ "+U[2, BOB, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT l.id, r.name, l.`value`
FROM left_t l JOIN right_t r ON l.id = r.id")
+ .build();
+
+ private DeletesByKeyPrograms() {}
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java
new file mode 100644
index 00000000000..eba93c8bba9
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/** Semantic tests for various {@link StreamExecNode}s and sources producing
deletes by key only. */
+public class DeletesByKeySemanticTests extends SemanticTestBase {
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return List.of(
+ DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY,
+ DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_FULL_DELETE,
+ DeletesByKeyPrograms.INSERT_SELECT_FULL_DELETE_FULL_DELETE,
+
DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY_WITH_PROJECTION,
+ DeletesByKeyPrograms.JOIN_INTO_FULL_DELETES,
+ DeletesByKeyPrograms.JOIN_INTO_DELETES_BY_KEY);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
new file mode 100644
index 00000000000..79f6da967e1
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Plan tests for removal of redundant changelog normalize. */
+public class ChangelogNormalizeOptimizationTest extends TableTestBase {
+
+ private final JavaStreamTableTestUtil util = javaStreamTestUtil();
+
+ static List<TestSpec> getTests() {
+ return Arrays.asList(
+ TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
SinkTable.UPSERT_SINK),
+ TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES,
SinkTable.UPSERT_SINK),
+ TestSpec.select(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SinkTable.UPSERT_SINK_FULL_DELETES),
+ TestSpec.select(
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
SinkTable.UPSERT_SINK_FULL_DELETES),
+ TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
SinkTable.RETRACT_SINK),
+ TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES,
SinkTable.RETRACT_SINK),
+ TestSpec.selectWithFilter(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
SinkTable.UPSERT_SINK),
+ TestSpec.selectWithFilter(
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
SinkTable.UPSERT_SINK),
+ TestSpec.selectWithFilter(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
SinkTable.RETRACT_SINK),
+ TestSpec.selectWithFilter(
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
SinkTable.RETRACT_SINK),
+ TestSpec.join(
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
+ SinkTable.UPSERT_SINK),
+ TestSpec.join(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SinkTable.UPSERT_SINK),
+ TestSpec.join(
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SinkTable.UPSERT_SINK),
+ TestSpec.join(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SourceTable.UPSERT_SOURCE_FULL_DELETES,
+ SinkTable.UPSERT_SINK),
+ TestSpec.join(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SinkTable.UPSERT_SINK_FULL_DELETES),
+ TestSpec.join(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+ SinkTable.RETRACT_SINK),
+ TestSpec.select(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA,
+ SinkTable.UPSERT_SINK_METADATA),
+ TestSpec.selectWithoutMetadata(
+ SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA,
SinkTable.UPSERT_SINK));
+ }
+
+ @AfterEach
+ void tearDown() {
+ Arrays.stream(util.tableEnv().listTables())
+ .forEach(t -> util.tableEnv().executeSql("DROP TABLE " + t));
+ }
+
+ @ParameterizedTest()
+ @MethodSource("getTests")
+ void testChangelogNormalizePlan(TestSpec spec) {
+ for (TableProperties tableProperties : spec.tablesToCreate) {
+ final String additionalColumns =
+ String.join(",\n", tableProperties.getAdditionalColumns());
+ util.tableEnv()
+ .executeSql(
+ String.format(
+ "CREATE TABLE %s ( id INT,\n"
+ + " col1 INT,\n"
+ + " col2 STRING,\n"
+ + "%s"
+ + " PRIMARY KEY(id) NOT ENFORCED)
WITH (%s)",
+ tableProperties.getTableName(),
+
StringUtils.isNullOrWhitespaceOnly(additionalColumns)
+ ? ""
+ : additionalColumns + ",\n",
+ String.join(",\n",
tableProperties.getOptions())));
+ }
+ util.verifyRelPlanInsert(
+ spec.query,
+ JavaScalaConversionUtil.toScala(
+
Collections.singletonList(ExplainDetail.CHANGELOG_MODE)));
+ }
+
+ interface TableProperties {
+
+ String getTableName();
+
+ List<String> getOptions();
+
+ List<String> getAdditionalColumns();
+ }
+
+ public enum SourceTable implements TableProperties {
+ UPSERT_SOURCE_PARTIAL_DELETES(
+ "upsert_table_partial_deletes",
+ "'connector' = 'values'",
+ "'changelog-mode' = 'UA,D'",
+ "'source.produces-delete-by-key'='true'"),
+ UPSERT_SOURCE_PARTIAL_DELETES_METADATA(
+ "upsert_table_partial_deletes_metadata",
+ List.of("`offset` BIGINT METADATA"),
+ "'connector' = 'values'",
+ "'changelog-mode' = 'UA,D'",
+ "'source.produces-delete-by-key'='true'",
+ "'readable-metadata' = 'offset:BIGINT'"),
+ UPSERT_SOURCE_FULL_DELETES(
+ "upsert_table_full_deletes",
+ "'connector' = 'values'",
+ "'changelog-mode' = 'UA,D'",
+ "'source.produces-delete-by-key'='false'");
+
+ private final String tableName;
+ private final List<String> options;
+ private final List<String> additionalColumns;
+
+ SourceTable(String tableName, String... options) {
+ this(tableName, Collections.emptyList(), options);
+ }
+
+ SourceTable(String tableName, List<String> additionalColumns,
String... options) {
+ this.tableName = tableName;
+ this.additionalColumns = additionalColumns;
+ this.options = Arrays.asList(options);
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public List<String> getOptions() {
+ return options;
+ }
+
+ @Override
+ public List<String> getAdditionalColumns() {
+ return additionalColumns;
+ }
+ }
+
+ public enum SinkTable implements TableProperties {
+ UPSERT_SINK(
+ "upsert_sink_table",
+ " 'connector' = 'values'",
+ "'sink.supports-delete-by-key' = 'true'",
+ "'sink-changelog-mode-enforced' = 'I,UA,D'"),
+ UPSERT_SINK_METADATA(
+ "upsert_sink_table",
+ List.of("`offset` BIGINT METADATA"),
+ " 'connector' = 'values'",
+ "'sink.supports-delete-by-key' = 'true'",
+ "'writable-metadata' = 'offset:BIGINT'",
+ "'sink-changelog-mode-enforced' = 'I,UA,D'"),
+ UPSERT_SINK_FULL_DELETES(
+ "upsert_sink_table_full_deletes",
+ " 'connector' = 'values'",
+ "'sink.supports-delete-by-key' = 'false'",
+ "'sink-changelog-mode-enforced' = 'I,UA,D'"),
+ RETRACT_SINK(
+ "all_change_sink_table",
+ "'connector' = 'values'",
+ "'sink-changelog-mode-enforced' = 'I,UA,UB,D'");
+
+ private final String tableName;
+ private final List<String> options;
+ private final List<String> additionalColumns;
+
+ SinkTable(String tableName, String... options) {
+ this(tableName, Collections.emptyList(), options);
+ }
+
+ SinkTable(String tableName, List<String> additionalColumns, String...
options) {
+ this.tableName = tableName;
+ this.options = Arrays.asList(options);
+ this.additionalColumns = additionalColumns;
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public List<String> getOptions() {
+ return options;
+ }
+
+ @Override
+ public List<String> getAdditionalColumns() {
+ return additionalColumns;
+ }
+ }
+
+ private static class TestSpec {
+
+ private final Set<TableProperties> tablesToCreate;
+ private final String query;
+ private final String description;
+
+ private TestSpec(String description, Set<TableProperties>
tablesToCreate, String query) {
+ this.tablesToCreate = tablesToCreate;
+ this.query = query;
+ this.description = description;
+ }
+
+ public static TestSpec selectWithoutMetadata(SourceTable sourceTable,
SinkTable sinkTable) {
+ return new TestSpec(
+ String.format(
+ "select_no_metadata_%s_into_%s",
+ sourceTable.getTableName(),
sinkTable.getTableName()),
+ new HashSet<>(Arrays.asList(sourceTable, sinkTable)),
+ String.format(
+ "INSERT INTO %s SELECT id, col1, col2 FROM %s",
+ sinkTable.getTableName(),
sourceTable.getTableName()));
+ }
+
+ public static TestSpec select(SourceTable sourceTable, SinkTable
sinkTable) {
+ return new TestSpec(
+ String.format(
+ "select_%s_into_%s",
+ sourceTable.getTableName(),
sinkTable.getTableName()),
+ new HashSet<>(Arrays.asList(sourceTable, sinkTable)),
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s",
+ sinkTable.getTableName(),
sourceTable.getTableName()));
+ }
+
+ public static TestSpec selectWithFilter(SourceTable sourceTable,
SinkTable sinkTable) {
+ return new TestSpec(
+ String.format(
+ "select_with_filter_%s_into_%s",
+ sourceTable.getTableName(),
sinkTable.getTableName()),
+ new HashSet<>(Arrays.asList(sourceTable, sinkTable)),
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s WHERE col1 > 2",
+ sinkTable.getTableName(),
sourceTable.getTableName()));
+ }
+
+ public static TestSpec join(
+ SourceTable leftTable, SourceTable rightTable, SinkTable
sinkTable) {
+ return new TestSpec(
+ String.format(
+ "join_%s_%s_into_%s",
+ leftTable.getTableName(),
+ rightTable.getTableName(),
+ sinkTable.getTableName()),
+ new HashSet<>(Arrays.asList(leftTable, rightTable,
sinkTable)),
+ String.format(
+ "INSERT INTO %s SELECT l.* FROM %s l JOIN %s r ON
l.id = r.id",
+ sinkTable.getTableName(),
+ leftTable.getTableName(),
+ rightTable.getTableName()));
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index c0c769a698a..f876e815b51 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -577,6 +577,7 @@ class DataStreamJavaITCase {
tableEnv.fromChangelogStream(
changelogStream,
Schema.newBuilder().primaryKey("f0").build(),
+ // produce partial deletes
ChangelogMode.upsert()));
final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t
GROUP BY f0");
@@ -585,7 +586,8 @@ class DataStreamJavaITCase {
tableEnv.toChangelogStream(
result,
Schema.newBuilder().primaryKey("f0").build(),
- ChangelogMode.upsert()),
+ // expect full deletes, therefore, require changelog
normalize
+ ChangelogMode.upsert(false)),
getOutput(inputOrOutput));
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
new file mode 100644
index 00000000000..c15ac5f8d62
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
@@ -0,0 +1,418 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testChangelogNormalizePlan[[10]
select_with_filter_upsert_table_full_deletes_into_all_change_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM
upsert_table_full_deletes WHERE col1 > 2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)],
changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2],
changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[11]
join_upsert_table_full_deletes_upsert_table_full_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM
upsert_table_full_deletes l JOIN upsert_table_full_deletes r ON l.id = r.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,D])
+ +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2,
id0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+ :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes, project=[id, col1, col2], metadata=[]]], fields=[id,
col1, col2], changelogMode=[UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- Calc(select=[id], changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes, project=[id, col1, col2], metadata=[]]], fields=[id,
col1, col2], changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[12]
join_upsert_table_partial_deletes_upsert_table_partial_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM
upsert_table_partial_deletes l JOIN upsert_table_partial_deletes r ON l.id =
r.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,PD])
+ +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2,
id0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,PD])
+ :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]],
fields=[id, col1, col2], changelogMode=[UA,PD])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+ +- Calc(select=[id], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]],
fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[13]
join_upsert_table_full_deletes_upsert_table_partial_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM
upsert_table_full_deletes l JOIN upsert_table_partial_deletes r ON l.id =
r.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,PD])
+ +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2,
id0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,PD])
+ :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, project=[id], metadata=[]]], fields=[id],
changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[14]
join_upsert_table_partial_deletes_upsert_table_full_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM
upsert_table_partial_deletes l JOIN upsert_table_full_deletes r ON l.id =
r.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,PD])
+ +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2,
id0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,PD])
+ :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes, project=[id], metadata=[]]], fields=[id],
changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[15]
join_upsert_table_partial_deletes_upsert_table_partial_deletes_into_upsert_sink_table_full_deletes]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table_full_deletes SELECT l.* FROM
upsert_table_partial_deletes l JOIN upsert_table_partial_deletes r ON l.id =
r.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
fields=[id, col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,D])
+ +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2,
id0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+ :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ : +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+ : +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]],
fields=[id, col1, col2], changelogMode=[UA,PD])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- Calc(select=[id], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog,
default_database, upsert_table_partial_deletes, project=[id, col1, col2],
metadata=[]]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[16]
join_upsert_table_partial_deletes_upsert_table_partial_deletes_into_all_change_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO all_change_sink_table SELECT l.* FROM
upsert_table_partial_deletes l JOIN upsert_table_partial_deletes r ON l.id =
r.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UB,UA,D])
+ +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2,
id0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+ :- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ : +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]],
fields=[id, col1, col2], changelogMode=[UA,PD])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- Calc(select=[id], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog,
default_database, upsert_table_partial_deletes, project=[id, col1, col2],
metadata=[]]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[17]
select_upsert_table_partial_deletes_metadata_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM
upsert_table_partial_deletes_metadata]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2, offset])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2], offset=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes_metadata, metadata=[offset]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2, offset], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes_metadata, metadata=[offset]]], fields=[id, col1,
col2, offset], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[18]
select_no_metadata_upsert_table_partial_deletes_metadata_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT id, col1, col2 FROM
upsert_table_partial_deletes_metadata]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalProject(id=[$0], col1=[$1], col2=[$2], offset=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes_metadata, metadata=[offset]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes_metadata, project=[id, col1, col2], metadata=[]]],
fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[1]
select_upsert_table_partial_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM
upsert_table_partial_deletes]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[2]
select_upsert_table_full_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM
upsert_table_full_deletes]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[3]
select_upsert_table_partial_deletes_into_upsert_sink_table_full_deletes]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table_full_deletes SELECT * FROM
upsert_table_partial_deletes]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[4]
select_upsert_table_full_deletes_into_upsert_sink_table_full_deletes]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table_full_deletes SELECT * FROM
upsert_table_full_deletes]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
fields=[id, col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[5]
select_upsert_table_partial_deletes_into_all_change_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM
upsert_table_partial_deletes]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[6]
select_upsert_table_full_deletes_into_all_change_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM
upsert_table_full_deletes]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[7]
select_with_filter_upsert_table_partial_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM
upsert_table_partial_deletes WHERE col1 > 2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)],
changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, filter=[]]], fields=[id, col1, col2],
changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[8]
select_with_filter_upsert_table_full_deletes_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM
upsert_table_full_deletes WHERE col1 > 2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_full_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)],
changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2],
changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[9]
select_with_filter_upsert_table_partial_deletes_into_all_change_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM
upsert_table_partial_deletes WHERE col1 > 2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table],
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)],
changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes, filter=[]]], fields=[id, col1, col2],
changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 04775282ec1..52db3da571c 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -333,7 +333,7 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS
EXPR$3], changelogMode=[
: +- TableSourceScan(table=[[default_catalog, default_database,
orders]], fields=[amount, currency, rowtime], changelogMode=[I])
+- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,PD])
]]>
</Resource>
</TestCase>
@@ -486,8 +486,8 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS
EXPR$3], changelogMode=[
: +- TableSourceScan(table=[[default_catalog, default_database,
orders]], fields=[amount, currency], changelogMode=[I])
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
- +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate], changelogMode=[UA,D])
+ +- Exchange(distribution=[hash[currency]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate], changelogMode=[UA,PD])
]]>
</Resource>
</TestCase>
@@ -658,8 +658,8 @@ LogicalProject(b=[$2], ts=[$0], a=[$1])
Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
:- Calc(select=[b, ts, CAST(a AS INTEGER) AS a], changelogMode=[I,UA,D])
: +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])
-: +- Exchange(distribution=[hash[a]], changelogMode=[UA,D])
-: +- TableSourceScan(table=[[default_catalog, default_database,
upsert_src]], fields=[ts, a, b], changelogMode=[UA,D])
+: +- Exchange(distribution=[hash[a]], changelogMode=[UA,PD])
+: +- TableSourceScan(table=[[default_catalog, default_database,
upsert_src]], fields=[ts, a, b], changelogMode=[UA,PD])
+- Calc(select=[b, t AS ts, a], changelogMode=[I,UA])
+- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b],
changelogMode=[I,UA])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
@@ -744,8 +744,8 @@ LogicalProject(id=[$0], ts=[$4])
<![CDATA[
Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
- +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id, c], metadata=[], watermark=[-(TO_TIMESTAMP(c), 1000:INTERVAL
SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[id, c],
changelogMode=[UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id, c], metadata=[], watermark=[-(TO_TIMESTAMP(c), 1000:INTERVAL
SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[id, c],
changelogMode=[UA,PD])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
index 3dc7bb8a666..2ba440b7fd1 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
@@ -95,7 +95,8 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy:
NonDeterministicUp
| primary key (a) not enforced
|) with (
| 'connector' = 'values',
- | 'changelog-mode' = 'I,UA,D'
+ | 'changelog-mode' = 'I,UA,D',
+ | 'source.produces-delete-by-key' = 'true'
|)""".stripMargin)
util.tableEnv.executeSql("""
|create temporary table upsert_src_with_meta (
@@ -109,6 +110,7 @@ class
NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
|) with (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UA,D',
+ | 'source.produces-delete-by-key' = 'true',
| 'readable-metadata' = 'metadata_1:INT,
metadata_2:STRING'
|)""".stripMargin)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index eb533cbe1c0..ea905662a42 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -492,6 +492,7 @@ class TableScanTest extends TableTestBase {
| 'connector' = 'values',
| 'changelog-mode' = 'UA,D',
| 'enable-watermark-push-down' = 'true',
+ | 'source.produces-delete-by-key' = 'true',
| 'disable-lookup' = 'true'
|)
""".stripMargin)
@@ -508,7 +509,8 @@ class TableScanTest extends TableTestBase {
| PRIMARY KEY (a) NOT ENFORCED
|) WITH (
| 'connector' = 'values',
- | 'changelog-mode' = 'UA,D'
+ | 'changelog-mode' = 'UA,D',
+ | 'source.produces-delete-by-key' = 'true'
|)
""".stripMargin)
util.addTable("""
@@ -593,6 +595,7 @@ class TableScanTest extends TableTestBase {
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'UA,D',
+ | 'source.produces-delete-by-key' = 'true',
| 'disable-lookup' = 'true'
|)
""".stripMargin)
@@ -628,6 +631,7 @@ class TableScanTest extends TableTestBase {
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'UA,D',
+ | 'source.produces-delete-by-key' = 'true',
| 'disable-lookup' = 'true'
|)
""".stripMargin)
@@ -786,7 +790,8 @@ class TableScanTest extends TableTestBase {
| 'runtime-source' = 'DataStream',
| 'scan.parallelism' = '5',
| 'enable-projection-push-down' = 'false',
- | 'changelog-mode' = 'I,UA,D'
+ | 'changelog-mode' = 'I,UA,D',
+ | 'source.produces-delete-by-key' = 'true'
|)
""".stripMargin)
util.addTable("""
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 2f4c576bc7c..6dc54d54ee3 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -90,8 +90,10 @@ import org.apache.calcite.sql.{SqlExplainLevel,
SqlIntervalQualifier}
import org.apache.calcite.sql.parser.SqlParserPos
import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType,
fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.TestTemplate
import org.junit.jupiter.api.extension.{BeforeEachCallback, ExtendWith,
ExtensionContext, RegisterExtension}
import org.junit.jupiter.api.io.TempDir
+import org.junit.jupiter.params.ParameterizedTest
import org.junit.platform.commons.support.AnnotationSupport
import java.io.{File, IOException}
@@ -155,7 +157,14 @@ class TestName extends BeforeEachCallback {
}
methodName = s"${context.getTestMethod.get().getName}$displayName"
} else {
- methodName = context.getTestMethod.get().getName
+ if (
+ AnnotationSupport.isAnnotated(context.getTestMethod,
classOf[ParameterizedTest])
+ || AnnotationSupport.isAnnotated(context.getTestMethod,
classOf[TestTemplate])
+ ) {
+ methodName =
s"${context.getTestMethod.get().getName}[${context.getDisplayName}]"
+ } else {
+ methodName = context.getTestMethod.get().getName
+ }
}
}