This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 370a8870450e4cd827c5fd1c4aacef551c15193e Author: xuyang <[email protected]> AuthorDate: Mon Mar 23 15:17:37 2026 +0800 [FLINK-39287][table-planner] Introduce FlinkRelMdImmutableColumns to implement the derivation logic of some simple nodes and update the logic to infer upsert key with immutable columns --- .../planner/connectors/DynamicSourceUtils.java | 7 + .../table/planner/plan/metadata/FlinkMetadata.java | 15 + .../plan/metadata/FlinkRelMdImmutableColumns.java | 311 ++++++++++++ .../plan/metadata/FlinkRelMetadataQuery.java | 21 + .../metadata/FlinkDefaultRelMetadataProvider.scala | 3 +- .../plan/metadata/FlinkRelMdUpsertKeys.scala | 107 +++-- .../planner/catalog/CatalogConstraintTest.java | 137 ++++-- .../metadata/FlinkRelMdImmutableColumnsTest.java | 527 +++++++++++++++++++++ .../utils/ImmutableColConstraintTestUtils.java | 66 +++ .../planner/plan/stream/sql/join/JoinTest.xml | 66 +++ .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 28 ++ .../plan/metadata/FlinkRelMdUpsertKeysTest.scala | 192 +++++++- .../planner/plan/metadata/MetadataTestUtil.scala | 52 ++ .../planner/plan/stream/sql/join/JoinTest.scala | 127 +++++ 14 files changed, 1584 insertions(+), 75 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java index d293342abb4..a78f9f52ce0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java @@ -533,6 +533,7 @@ public final class DynamicSourceUtils { hasChangelogMode && changelogMode.contains(RowKind.UPDATE_BEFORE); final boolean hasUpdateAfter = hasChangelogMode && changelogMode.contains(RowKind.UPDATE_AFTER); + final boolean hasDelete = hasChangelogMode && changelogMode.contains(RowKind.DELETE); if (!hasUpdateBefore && hasUpdateAfter) { // only UPDATE_AFTER if (!schema.getPrimaryKey().isPresent()) { @@ -564,6 +565,12 @@ public final class DynamicSourceUtils { tableDebugName)); } } + if (hasDelete) { + if (schema.getImmutableColumns().isPresent()) { + throw new ValidationException( + "The immutable constraint cannot be defined on the table with changelog mode [DELETE]."); + } + } } private static void validateScanSourceForBatch( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java index d7cf6cfa927..d35f2df3f3b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java @@ -254,4 +254,19 @@ public abstract class FlinkMetadata { Set<ImmutableBitSet> getUpsertKeys(RelNode r, RelMetadataQuery mq); } } + + /** Metadata about which combinations of columns are unmodified corresponding each pk. */ + public interface ImmutableColumns extends Metadata { + Method METHOD = Types.lookupMethod(ImmutableColumns.class, "getImmutableColumns"); + + MetadataDef<ImmutableColumns> DEF = + MetadataDef.of(ImmutableColumns.class, ImmutableColumns.Handler.class, METHOD); + + ImmutableBitSet getImmutableColumns(); + + /** Handler API. */ + interface Handler extends MetadataHandler<ImmutableColumns> { + ImmutableBitSet getImmutableColumns(RelNode r, RelMetadataQuery mq); + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumns.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumns.java new file mode 100644 index 00000000000..10dc43a5d67 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumns.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.metadata; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.MetadataDef; +import org.apache.calcite.rel.metadata.MetadataHandler; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** A metadata handler for {@link FlinkMetadata.ImmutableColumns}. */ +public class FlinkRelMdImmutableColumns implements MetadataHandler<FlinkMetadata.ImmutableColumns> { + static final FlinkRelMdImmutableColumns INSTANCE = new FlinkRelMdImmutableColumns(); + + public static final RelMetadataProvider SOURCE = + ReflectiveRelMetadataProvider.reflectiveSource( + FlinkMetadata.ImmutableColumns.METHOD, INSTANCE); + + // ~ Constructors ----------------------------------------------------------- + + private FlinkRelMdImmutableColumns() {} + + // ~ Methods ---------------------------------------------------------------- + + public MetadataDef<FlinkMetadata.ImmutableColumns> getDef() { + return FlinkMetadata.ImmutableColumns.DEF; + } + + public ImmutableBitSet getImmutableColumns(TableScan rel, RelMetadataQuery mq) { + return getTableImmutableColumns(rel.getTable()); + } + + public ImmutableBitSet getImmutableColumns(Project rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys( + rel, getProjectImmutableColumns(rel.getProjects(), rel.getInput(), fmq), fmq); + } + + public ImmutableBitSet getImmutableColumns(Filter rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), fmq); + } + + public ImmutableBitSet getImmutableColumns(Calc rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + List<RexNode> projects = + rel.getProgram().getProjectList().stream() + .map(localRef -> rel.getProgram().expandLocalRef(localRef)) + .collect(Collectors.toList()); + return guardByUpsertKeys( + rel, getProjectImmutableColumns(projects, rel.getInput(), fmq), fmq); + } + + public ImmutableBitSet getImmutableColumns(Exchange rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), fmq); + } + + public ImmutableBitSet getImmutableColumns( + StreamPhysicalChangelogNormalize rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), fmq); + } + + public ImmutableBitSet getImmutableColumns( + StreamPhysicalMiniBatchAssigner rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), fmq); + } + + public ImmutableBitSet getImmutableColumns( + StreamPhysicalDropUpdateBefore rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), fmq); + } + + public ImmutableBitSet getImmutableColumns(WatermarkAssigner rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), fmq); + } + + public ImmutableBitSet getImmutableColumns(Join join, RelMetadataQuery mq) { + JoinRelType joinType = join.getJoinType(); + + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + int leftFieldCount = join.getLeft().getRowType().getFieldCount(); + + return unionJoinImmutableCols( + join, + joinType, + () -> fmq.getImmutableColumns(join.getLeft()), + () -> fmq.getImmutableColumns(join.getRight()), + leftFieldCount, + fmq); + } + + public ImmutableBitSet getImmutableColumns(CommonPhysicalLookupJoin join, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + int leftFieldCount = join.getInput().getRowType().getFieldCount(); + + return unionJoinImmutableCols( + join, + join.joinType(), + () -> fmq.getImmutableColumns(join.getInput()), + // TODO support propagating immutable columns from the lookup side + () -> null, // rightImmutableColsSupplier + leftFieldCount, + fmq); + } + + public ImmutableBitSet getImmutableColumns(HepRelVertex rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getCurrentRel()), fmq); + } + + public ImmutableBitSet getImmutableColumns(RelSubset rel, RelMetadataQuery mq) { + FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq); + return guardByUpsertKeys( + rel, fmq.getImmutableColumns(Util.first(rel.getBest(), rel.getOriginal())), fmq); + } + + public ImmutableBitSet getImmutableColumns(RelNode rel, RelMetadataQuery mq) { + // Catch-all rule when none of the others apply. + // More nodes can be supported later, such as Expand, Aggregate, Window, Rank, etc. + return null; + } + + /** + * Guards the immutable columns by verifying that the node has upsert keys. Immutable columns + * are only meaningful "within each pk"; if no upsert key exists, the result is cleared. + */ + @Nullable + private ImmutableBitSet guardByUpsertKeys( + RelNode rel, @Nullable ImmutableBitSet immutableColumns, FlinkRelMetadataQuery fmq) { + if (immutableColumns == null || immutableColumns.isEmpty()) { + return immutableColumns; + } + Set<ImmutableBitSet> upsertKeys = fmq.getUpsertKeys(rel); + if (upsertKeys == null || upsertKeys.isEmpty()) { + return null; + } + return immutableColumns; + } + + /** + * Unions left/right immutable columns for a join, respecting join type semantics: + * + * <ul> + * <li>SEMI / ANTI: output contains only left-side columns → propagate left immutable only + * <li>LEFT: right side may produce nulls → ignore right immutable + * <li>RIGHT: left side may produce nulls → ignore left immutable + * <li>FULL: both sides may produce nulls → ignore both + * <li>INNER: both sides preserved + * </ul> + * + * <p>Right-side indices are shifted by leftFieldCount before union. The result is guarded by + * upsert keys. + */ + @Nullable + private ImmutableBitSet unionJoinImmutableCols( + RelNode rel, + JoinRelType joinType, + Supplier<ImmutableBitSet> leftImmutableColsSupplier, + Supplier<ImmutableBitSet> rightImmutableColsSupplier, + int leftFieldCount, + FlinkRelMetadataQuery fmq) { + if (joinType == JoinRelType.SEMI || joinType == JoinRelType.ANTI) { + return guardByUpsertKeys(rel, leftImmutableColsSupplier.get(), fmq); + } + + // nullable side's columns may flip between value/null → not immutable + ImmutableBitSet leftImmutableColumns = + joinType.generatesNullsOnLeft() ? null : leftImmutableColsSupplier.get(); + ImmutableBitSet rightImmutableColumns = + joinType.generatesNullsOnRight() ? null : rightImmutableColsSupplier.get(); + + // shift right side indices by left field count + ImmutableBitSet shiftedRight = + rightImmutableColumns == null || rightImmutableColumns.isEmpty() + ? rightImmutableColumns + : ImmutableBitSet.of( + rightImmutableColumns.toList().stream() + .map(i -> i + leftFieldCount) + .collect(Collectors.toList())); + + // union left and right immutable columns + ImmutableBitSet result; + if (leftImmutableColumns != null && shiftedRight != null) { + result = leftImmutableColumns.union(shiftedRight); + } else { + result = Optional.ofNullable(leftImmutableColumns).orElse(shiftedRight); + } + return guardByUpsertKeys(rel, result, fmq); + } + + @Nullable + private ImmutableBitSet getTableImmutableColumns(RelOptTable relOptTable) { + if (!(relOptTable instanceof TableSourceTable)) { + return null; + } + + TableSourceTable tst = (TableSourceTable) relOptTable; + ResolvedSchema schema = tst.contextResolvedTable().getResolvedTable().getResolvedSchema(); + + if (schema.getPrimaryKey().isEmpty()) { + return null; + } + + // use relOptTable's type which may be projected based on original schema + List<String> tableOutputFields = relOptTable.getRowType().getFieldNames(); + + // add pk + Set<String> allImmutableFieldsInSchema = + new HashSet<>(schema.getPrimaryKey().get().getColumns()); + // add constraint for immutable columns + if (schema.getImmutableColumns().isPresent()) { + allImmutableFieldsInSchema.addAll(schema.getImmutableColumns().get().getColumns()); + } + + Set<Integer> outputImmutableColumns = + allImmutableFieldsInSchema.stream() + .flatMap( + immutableField -> { + int fieldIdx = tableOutputFields.indexOf(immutableField); + if (fieldIdx >= 0) { + return Stream.of(fieldIdx); + } else { + return Stream.empty(); + } + }) + .collect(Collectors.toSet()); + + return ImmutableBitSet.of(outputImmutableColumns); + } + + @Nullable + private ImmutableBitSet getProjectImmutableColumns( + List<RexNode> projects, RelNode inputNode, FlinkRelMetadataQuery fmq) { + ImmutableBitSet inputImmutableColumns = fmq.getImmutableColumns(inputNode); + if (inputImmutableColumns == null || inputImmutableColumns.isEmpty()) { + return inputImmutableColumns; + } + + Map<Integer, List<Integer>> mapInToOutPos = new HashMap<>(); + for (int i = 0; i < projects.size(); i++) { + RexNode projExpr = projects.get(i); + if (projExpr instanceof RexInputRef) { + mapInToOutPos + .computeIfAbsent( + ((RexInputRef) projExpr).getIndex(), k -> new ArrayList<>()) + .add(i); + } + } + return ImmutableBitSet.of( + inputImmutableColumns.toList().stream() + .flatMap(in -> mapInToOutPos.getOrDefault(in, List.of()).stream()) + .collect(Collectors.toList())); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java index 5c1cf8de5d3..562ae680a91 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java @@ -50,6 +50,7 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery { private FlinkMetadata.ModifiedMonotonicity.Handler modifiedMonotonicityHandler; private FlinkMetadata.WindowProperties.Handler windowPropertiesHandler; private FlinkMetadata.UpsertKeys.Handler upsertKeysHandler; + private FlinkMetadata.ImmutableColumns.Handler immutableColumnsHandler; /** * Returns an instance of FlinkRelMetadataQuery. It ensures that cycles do not occur while @@ -85,6 +86,7 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery { this.modifiedMonotonicityHandler = HANDLERS.modifiedMonotonicityHandler; this.windowPropertiesHandler = HANDLERS.windowPropertiesHandler; this.upsertKeysHandler = HANDLERS.upsertKeysHandler; + this.immutableColumnsHandler = HANDLERS.immutableColumnsHandler; } /** Extended handlers. */ @@ -107,6 +109,8 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery { initialHandler(FlinkMetadata.WindowProperties.Handler.class); private FlinkMetadata.UpsertKeys.Handler upsertKeysHandler = initialHandler(FlinkMetadata.UpsertKeys.Handler.class); + private FlinkMetadata.ImmutableColumns.Handler immutableColumnsHandler = + initialHandler(FlinkMetadata.ImmutableColumns.Handler.class); } /** @@ -308,4 +312,21 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery { } return getUpsertKeys(rel); } + + /** + * Returns the columns that will never be updated upstream within each pk. + * + * @return the columns that will never be updated upstream within each pk, or null if this + * information cannot be determined (whereas empty set indicates that all columns may be + * updated) + */ + public ImmutableBitSet getImmutableColumns(RelNode rel) { + for (; ; ) { + try { + return immutableColumnsHandler.getImmutableColumns(rel, this); + } catch (JaninoRelMetadataProvider.NoHandler e) { + immutableColumnsHandler = revise(e.relClass, FlinkMetadata.ImmutableColumns.DEF); + } + } + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala index f5d4d93832a..135603d56c2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala @@ -48,7 +48,8 @@ object FlinkDefaultRelMetadataProvider { RelMdPredicates.SOURCE, FlinkRelMdCollation.SOURCE, RelMdExplainVisibility.SOURCE, - FlinkRelMdWindowProperties.SOURCE + FlinkRelMdWindowProperties.SOURCE, + FlinkRelMdImmutableColumns.SOURCE ) ) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala index ea52022297c..7890b116b26 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala @@ -48,10 +48,13 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { override def getDef: MetadataDef[UpsertKeys] = UpsertKeys.DEF def getUpsertKeys(rel: TableScan, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { - rel.getTable match { + val baseKeys = rel.getTable match { case t: IntermediateRelTable => t.upsertKeys case _ => mq.getUniqueKeys(rel) } + enrichWithImmutableColumns( + baseKeys, + () => FlinkRelMetadataQuery.reuseOrCreate(mq).getImmutableColumns(rel)) } def getUpsertKeys(rel: Project, mq: RelMetadataQuery): JSet[ImmutableBitSet] = @@ -81,11 +84,13 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { () => FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput)) def getUpsertKeys(rel: Exchange, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { - val keys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) + val upsertKeys = fmq.getUpsertKeys(rel.getInput) + val immutableColumns = fmq.getImmutableColumns(rel.getInput) rel.getDistribution.getType match { case RelDistribution.Type.HASH_DISTRIBUTED => - filterKeys(keys, ImmutableBitSet.of(rel.getDistribution.getKeys)) - case RelDistribution.Type.SINGLETON => keys + filterKeys(upsertKeys, ImmutableBitSet.of(rel.getDistribution.getKeys), immutableColumns) + case RelDistribution.Type.SINGLETON => upsertKeys case t => throw new UnsupportedOperationException("Unsupported distribution type: " + t) } } @@ -95,19 +100,20 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { case rank: StreamPhysicalRank if RankUtil.isDeduplication(rel) => ImmutableSet.of(ImmutableBitSet.of(rank.partitionKey.toArray.map(Integer.valueOf).toList)) case _ => - val inputKeys = filterKeys( - FlinkRelMetadataQuery - .reuseOrCreate(mq) - .getUpsertKeys(rel.getInput), - rel.partitionKey) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) + val inputUpsertKeys = fmq.getUpsertKeys(rel.getInput) + val inputImmutableColumns = fmq.getImmutableColumns(rel.getInput) + val inputKeys = filterKeys(inputUpsertKeys, rel.partitionKey, inputImmutableColumns) FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys) } } - def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] = - filterKeys( - FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput), - ImmutableBitSet.of(rel.getCollation.getKeys)) + def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { + val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) + val upsertKeys = fmq.getUpsertKeys(rel.getInput) + val immutableColumns = fmq.getImmutableColumns(rel.getInput) + filterKeys(upsertKeys, ImmutableBitSet.of(rel.getCollation.getKeys), immutableColumns) + } def getUpsertKeys( rel: StreamPhysicalChangelogNormalize, @@ -206,20 +212,23 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { rel: SingleRel, mq: RelMetadataQuery, distributionKeys: ImmutableBitSet*): JSet[ImmutableBitSet] = { - var inputKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) + var inputUpsertKeys = fmq.getUpsertKeys(rel.getInput) + val inputImmutableColumns = fmq.getImmutableColumns(rel.getInput) for (distributionKey <- distributionKeys) { - inputKeys = filterKeys(inputKeys, distributionKey) + inputUpsertKeys = filterKeys(inputUpsertKeys, distributionKey, inputImmutableColumns) } - inputKeys + inputUpsertKeys } def getUpsertKeys(join: Join, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { val joinInfo = join.analyzeCondition() join.getJoinType match { case JoinRelType.SEMI | JoinRelType.ANTI => - filterKeys( - FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(join.getLeft), - joinInfo.leftSet()) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) + val leftInputUpsertKeys = fmq.getUpsertKeys(join.getLeft) + val leftInputImmutableColumns = fmq.getImmutableColumns(join.getLeft) + filterKeys(leftInputUpsertKeys, joinInfo.leftSet(), leftInputImmutableColumns) case _ => getJoinUpsertKeys(joinInfo, join.getJoinType, join.getLeft, join.getRight, mq) } @@ -348,6 +357,8 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) val leftKeys = fmq.getUpsertKeys(left) val rightKeys = fmq.getUpsertKeys(right) + val leftImmutableColumns = fmq.getImmutableColumns(left) + val rightImmutableColumns = fmq.getImmutableColumns(right) FlinkRelMdUniqueKeys.INSTANCE.getJoinUniqueKeys( joinRelType, @@ -356,8 +367,8 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { // (the distribution keys), ensuring the result remains an upsert key. // Note: An Exchange typically applies this filtering already via fmq.getUpsertKeys(...). // We keep it here to be safe in case a join can appear without a preceding Exchange. - filterKeys(leftKeys, joinInfo.leftSet), - filterKeys(rightKeys, joinInfo.rightSet), + filterKeys(leftKeys, joinInfo.leftSet, leftImmutableColumns), + filterKeys(rightKeys, joinInfo.rightSet, rightImmutableColumns), isSideUnique(leftKeys, joinInfo.leftSet), isSideUnique(rightKeys, joinInfo.rightSet) ) @@ -392,17 +403,35 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { * * Example: * - distributionKey = {k1} - * - keys = {{k1}, {k1, k2}, {k2}} + * - upsertKeys = {{k1}, {k1, k2}, {k2}} + * - immutableColumns = null * Result: {{k1}, {k1, k2}} (drops {k2}) + * + * Example: + * - distributionKey = {k1, k3} + * - upsertKeys = {{k1}, {k1, k2}, {k1, k3}, {k2}} + * - immutableColumns = {k3} + * Result: {{k1}, {k1, k2}, {k1, k3}} (drops {k2}) */ private def filterKeys( - keys: JSet[ImmutableBitSet], - distributionKey: ImmutableBitSet): JSet[ImmutableBitSet] = { - if (keys != null) { - keys.filter(k => k.contains(distributionKey)) - } else { - null + upsertKeys: JSet[ImmutableBitSet], + distributionKey: ImmutableBitSet, + immutableColumns: ImmutableBitSet): JSet[ImmutableBitSet] = { + if (upsertKeys == null) { + return null } + + upsertKeys.filter( + upsertKey => { + val key = + if (immutableColumns == null) { + upsertKey + } else { + upsertKey.union(immutableColumns) + } + + key.contains(distributionKey) + }) } /* @@ -426,6 +455,28 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { // Catch-all rule when none of the others apply. def getUpsertKeys(rel: RelNode, mq: RelMetadataQuery): JSet[ImmutableBitSet] = null + + /** + * Enriches the given upsert keys with immutable columns as an additional upsert key. + * + * If baseKeys is null or empty, returns as-is without invoking the supplier (immutable columns + * are meaningless without existing upsert keys). + */ + private def enrichWithImmutableColumns( + baseKeys: JSet[ImmutableBitSet], + immutableColsSupplier: () => ImmutableBitSet): JSet[ImmutableBitSet] = { + if (baseKeys == null || baseKeys.isEmpty) { + return baseKeys + } + val immutableCols = immutableColsSupplier() + if (immutableCols != null && !immutableCols.isEmpty) { + val enriched = new util.HashSet[ImmutableBitSet](baseKeys) + enriched.add(immutableCols) + enriched + } else { + baseKeys + } + } } object FlinkRelMdUpsertKeys { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java index 738824398cd..82786f3d431 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java @@ -22,11 +22,11 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; -import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -39,6 +39,8 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.util.ImmutableBitSet; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; import java.util.Collections; @@ -47,6 +49,7 @@ import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for Catalog constraints. */ public class CatalogConstraintTest { @@ -58,32 +61,18 @@ public class CatalogConstraintTest { @BeforeEach void setup() { - EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); tEnv = TableEnvironment.create(settings); catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null); assertThat(catalog).isNotNull(); } - @Test - void testWithPrimaryKey() throws Exception { - final Schema tableSchema = - Schema.newBuilder() - .fromResolvedSchema( - new ResolvedSchema( - Arrays.asList( - Column.physical("a", DataTypes.STRING()), - Column.physical("b", DataTypes.BIGINT().notNull()), - Column.physical("c", DataTypes.INT())), - Collections.emptyList(), - UniqueConstraint.primaryKey( - "primary_constraint", - Collections.singletonList("b")), - Collections.singletonList( - DefaultIndex.newIndex("idx", List.of("a", "b"))), - ImmutableColumnsConstraint.immutableColumns( - "immutable_constraint", List.of("b")))) - .build(); - Map<String, String> properties = buildCatalogTableProperties(); + @ParameterizedTest() + @ValueSource(booleans = {true, false}) + void testWithPrimaryKey(boolean containsPrimaryKey) throws Exception { + ResolvedSchema resolvedSchema = buildResolvedSchema(containsPrimaryKey); + final Schema tableSchema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(); + Map<String, String> properties = buildCatalogTableProperties(true); catalog.createTable( new ObjectPath(databaseName, "T1"), @@ -97,21 +86,62 @@ public class CatalogConstraintTest { RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1")); FlinkRelMetadataQuery mq = FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); + if (containsPrimaryKey) { + assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1))); + } else { + assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of()); + } + } + + @ParameterizedTest() + @ValueSource(booleans = {true, false}) + void testWithImmutableColsConstraint(boolean containsImmutableColsConstraint) throws Exception { + final Schema.Builder schemaBuilder = + Schema.newBuilder().fromResolvedSchema(buildResolvedSchema(true)); + if (containsImmutableColsConstraint) { + schemaBuilder.immutableColumnsNamed("immutable_constraint", List.of("a")); + } + final Schema tableSchema = schemaBuilder.build(); + Map<String, String> properties = buildCatalogTableProperties(false); + + catalog.createTable( + new ObjectPath(databaseName, "T1"), + CatalogTable.newBuilder() + .schema(tableSchema) + .comment("") + .options(properties) + .build(), + false); + + RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1")); + + FlinkRelMetadataQuery mq = + FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); + // unique keys are not changed assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1))); + + if (containsImmutableColsConstraint) { + assertThat((Iterable<? extends Integer>) mq.getImmutableColumns(t1)) + .isEqualTo(ImmutableBitSet.of(0, 1)); + assertThat(mq.getUpsertKeys(t1)) + .isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1), ImmutableBitSet.of(0, 1))); + } else { + assertThat((Iterable<? extends Integer>) mq.getImmutableColumns(t1)) + .isEqualTo(ImmutableBitSet.of(1)); + assertThat(mq.getUpsertKeys(t1)).isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1))); + } } @Test - void testWithoutPrimaryKey() throws Exception { - + void testImmutableColsConstraintDefinedOnSourceWithDelete() throws Exception { final Schema tableSchema = Schema.newBuilder() - .fromResolvedSchema( - ResolvedSchema.of( - Column.physical("a", DataTypes.BIGINT()), - Column.physical("b", DataTypes.STRING()), - Column.physical("c", DataTypes.INT()))) + .fromResolvedSchema(buildResolvedSchema(true)) + .immutableColumnsNamed("immutable_constraint", List.of("a")) .build(); - Map<String, String> properties = buildCatalogTableProperties(); + + Map<String, String> properties = buildCatalogTableProperties(false); + properties.put("changelog-mode", "I,UA,UB,D"); catalog.createTable( new ObjectPath(databaseName, "T1"), @@ -122,21 +152,44 @@ public class CatalogConstraintTest { .build(), false); - RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1")); - FlinkRelMetadataQuery mq = - FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); - assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of()); + assertThatThrownBy(() -> TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1"))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The immutable constraint cannot be defined " + + "on the table with changelog mode [DELETE]."); } - private Map<String, String> buildCatalogTableProperties() { - Map<String, String> properties = new HashMap<>(); - properties.put("connector.type", "filesystem"); - properties.put("connector.property-version", "1"); - properties.put("connector.path", "/path/to/csv"); + private ResolvedSchema buildResolvedSchema(boolean containsPrimaryKey) { + return containsPrimaryKey + ? new ResolvedSchema( + Arrays.asList( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.BIGINT().notNull()), + Column.physical("c", DataTypes.INT())), + Collections.emptyList(), + UniqueConstraint.primaryKey( + "primary_constraint", Collections.singletonList("b")), + Collections.singletonList(DefaultIndex.newIndex("idx", List.of("a", "b"))), + null) + : ResolvedSchema.of( + Column.physical("a", DataTypes.BIGINT()), + Column.physical("b", DataTypes.STRING()), + Column.physical("c", DataTypes.INT())); + } - properties.put("format.type", "csv"); - properties.put("format.property-version", "1"); - properties.put("format.field-delimiter", ";"); + private Map<String, String> buildCatalogTableProperties(boolean legacyTable) { + Map<String, String> properties = new HashMap<>(); + if (legacyTable) { + properties.put("connector.type", "filesystem"); + properties.put("connector.property-version", "1"); + properties.put("connector.path", "/path/to/csv"); + + properties.put("format.type", "csv"); + properties.put("format.property-version", "1"); + properties.put("format.field-delimiter", ";"); + } else { + properties.put("connector", "values"); + } return properties; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumnsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumnsTest.java new file mode 100644 index 00000000000..02d49885739 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumnsTest.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.metadata; + +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import scala.Option; + +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** Tests for {@link FlinkRelMdImmutableColumns}. */ +public class FlinkRelMdImmutableColumnsTest extends FlinkRelMdHandlerTestBase { + + // ------------------------------------------------------------------------------------- + // TableScan + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnTableScanWithImmutableCols() { + // Projected rowType (a,c,d): PK(a)=0, immutable(c)=1, immutable(d)=2 → {0, 1, 2} + assertEquals( + ImmutableBitSet.of(0, 1, 2), + mq().getImmutableColumns(tableWithImmutableColsLogicalScan())); + } + + @Test + public void testGetImmutableColumnsOnTableScanWithPkOnly() { + // TableSourceTable1: PK(a,b)={0,1}, no immutable constraint → {0, 1} + RelNode scan = relBuilder().scan("TableSourceTable1").build(); + assertEquals(ImmutableBitSet.of(0, 1), mq().getImmutableColumns(scan)); + } + + @Test + public void testGetImmutableColumnsOnTableScanWithSingleColPk() { + // TableSourceTable2: PK(b)={1} → {1} + RelNode scan = relBuilder().scan("TableSourceTable2").build(); + assertEquals(ImmutableBitSet.of(1), mq().getImmutableColumns(scan)); + } + + @Test + public void testGetImmutableColumnsOnTableScanWithoutPk() { + // TableSourceTable3: no PK → null + RelNode scan = relBuilder().scan("TableSourceTable3").build(); + assertNull(mq().getImmutableColumns(scan)); + } + + @Test + public void testGetImmutableColumnsOnNonTableSourceTableScan() { + // student uses MockMetaTable (not TableSourceTable) → null + assertNull(mq().getImmutableColumns(studentLogicalScan())); + } + + // ------------------------------------------------------------------------------------- + // Project + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnProjectKeepsAll() { + // Project: a(0), c(1), d(2) → output immutable = {0, 1, 2} + relBuilder().push(tableWithImmutableColsLogicalScan()); + RelNode project = + relBuilder() + .project( + relBuilder().field(0), // a → out 0 + relBuilder().field(1), // c → out 1 + relBuilder().field(2)) // d → out 2 + .build(); + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(project)); + } + + @Test + public void testGetImmutableColumnsOnProjectDropsImmutableCols() { + // Project with only expressions (no direct field refs) → none tracked → empty + relBuilder().push(tableWithImmutableColsLogicalScan()); + RelNode project = + relBuilder() + .project( + relBuilder() + .call( + SqlStdOperatorTable.PLUS, + relBuilder().field(0), + relBuilder().literal(1))) + .build(); + assertEquals(ImmutableBitSet.of(), mq().getImmutableColumns(project)); + } + + @Test + public void testGetImmutableColumnsOnProjectWithDuplicateRefs() { + // Project: a(0), a(0), c(1) → a maps to {0, 1}, c maps to {2} → {0, 1, 2} + relBuilder().push(tableWithImmutableColsLogicalScan()); + RelNode project = + relBuilder() + .project( + relBuilder().field(0), // a → out 0 + relBuilder().field(0), // a → out 1 + relBuilder().field(1)) // c → out 2 + .build(); + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(project)); + } + + @Test + public void testGetImmutableColumnsOnProjectWithExpression() { + // Project: a(0), a+1 (expression), c(1) + // Expressions (non-RexInputRef) are not tracked → a+1 is not immutable + relBuilder().push(tableWithImmutableColsLogicalScan()); + RelNode project = + relBuilder() + .project( + relBuilder().field(0), // a → out 0 + relBuilder() + .call( + SqlStdOperatorTable.PLUS, + relBuilder().field(0), + relBuilder().literal(1)), // a+1 → out 1 + relBuilder().field(1)) // c → out 2 + .build(); + // a→0, c→2 → {0, 2} + assertEquals(ImmutableBitSet.of(0, 2), mq().getImmutableColumns(project)); + } + + @Test + public void testGetImmutableColumnsOnProjectNullInput() { + // Project on student (MockMetaTable immutable = null) → null + relBuilder().push(studentLogicalScan()); + RelNode project = + relBuilder().project(relBuilder().field(0), relBuilder().field(1)).build(); + assertNull(mq().getImmutableColumns(project)); + } + + // ------------------------------------------------------------------------------------- + // Filter + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnFilter() { + // Filter passes through immutable columns unchanged + relBuilder().push(tableWithImmutableColsLogicalScan()); + RelNode filter = + relBuilder() + .filter( + relBuilder() + .call( + SqlStdOperatorTable.LESS_THAN, + relBuilder().field(0), + relBuilder().literal(100))) + .build(); + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(filter)); + } + + @Test + public void testGetImmutableColumnsOnFilterNullInput() { + // Filter on student (MockMetaTable immutable = null) → null + assertNull(mq().getImmutableColumns(logicalFilter())); + } + + // ------------------------------------------------------------------------------------- + // Calc + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnCalc() { + // Calc with projection [a(0), c(2)] and filter a < 100 + RelNode input = tableWithImmutableColsLogicalScan(); + relBuilder().push(input); + + RexNode proj0 = relBuilder().field(0); // a + RexNode proj1 = relBuilder().field(1); // c + List<RexNode> projects = Arrays.asList(proj0, proj1); + + RexNode condition = + relBuilder() + .call( + SqlStdOperatorTable.LESS_THAN, + relBuilder().field(0), + relBuilder().literal(100)); + List<RexNode> conditions = Collections.singletonList(condition); + + // Build a temp project to get the output row type + RelNode tempProject = relBuilder().project(proj0, proj1).build(); + RelDataType outputRowType = tempProject.getRowType(); + + Calc calc = createLogicalCalc(input, outputRowType, projects, conditions); + + // Input immutable: {0, 1, 2} → a→0, c→1 → {0, 1} + assertEquals(ImmutableBitSet.of(0, 1), mq().getImmutableColumns(calc)); + } + + // ------------------------------------------------------------------------------------- + // WatermarkAssigner + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnWatermarkAssigner() { + // Build a WatermarkAssigner on top of the immutable scan, using rowtime column (index 3) + RelNode input = tableWithImmutableColsLogicalScan(); + FlinkContext flinkContext = unwrapContext(cluster()); + RexNode watermarkExpr = + flinkContext + .getRexFactory() + .createSqlToRexConverter(input.getRowType(), null) + .convertToRexNode("rowtime - INTERVAL '10' SECOND"); + RelNode watermarkAssigner = + LogicalWatermarkAssigner.create( + cluster(), input, Collections.emptyList(), 3, watermarkExpr); + // Pass through: {0, 1, 2} + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(watermarkAssigner)); + } + + // ------------------------------------------------------------------------------------- + // MiniBatchAssigner + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnMiniBatchAssigner() { + RelNode input = tableWithImmutableColsStreamScan(); + RelNode miniBatchAssigner = + new StreamPhysicalMiniBatchAssigner(cluster(), streamPhysicalTraits(), input); + // Pass through: {0, 1, 2} + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(miniBatchAssigner)); + } + + // ------------------------------------------------------------------------------------- + // Exchange + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnExchange() { + RelNode scan = tableWithImmutableColsStreamScan(); + FlinkRelDistribution hash = FlinkRelDistribution.hash(new int[] {0}, true); + RelNode exchange = + new StreamPhysicalExchange( + cluster(), streamPhysicalTraits().replace(hash), scan, hash); + // Pass through: {0, 1, 2} + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(exchange)); + } + + // ------------------------------------------------------------------------------------- + // ChangelogNormalize + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnChangelogNormalize() { + RelNode scan = tableWithImmutableColsStreamScan(); + RelNode changelogNormalize = + new StreamPhysicalChangelogNormalize( + cluster(), + streamPhysicalTraits(), + scan, + new int[] {0}, + null, + false, + new RexNode[] {}); + // Pass through: {0, 1, 2} + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(changelogNormalize)); + } + + // ------------------------------------------------------------------------------------- + // DropUpdateBefore + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnDropUpdateBefore() { + RelNode scan = tableWithImmutableColsStreamScan(); + RelNode dropUpdateBefore = + new StreamPhysicalDropUpdateBefore(cluster(), streamPhysicalTraits(), scan); + // Pass through: {0, 1, 2} + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(dropUpdateBefore)); + } + + // ------------------------------------------------------------------------------------- + // Join + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnInnerJoin1() { + // Left: projected_table_source_table_with_immutable_cols (a,c,d,rowtime) → + // immutable={0,1,2} + // Right: TableSourceTable1 (a,b,c,d) → immutable={0,1} + // Right shifted by 4: {4,5} + // Union: {0,1,2,4,5} + RelNode join = + relBuilder() + .scan("projected_table_source_table_with_immutable_cols") + .scan("TableSourceTable1") + .join( + JoinRelType.INNER, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + assertEquals(ImmutableBitSet.of(0, 1, 2, 4, 5), mq().getImmutableColumns(join)); + } + + @Test + public void testGetImmutableColumnsOnInnerJoin2() { + // Left: projected_table_source_table_with_immutable_cols → {0,1,2} + // Right: projected_table_source_table_with_immutable_cols → {0,1,2} shifted by 4 → {4,5,6} + // Union: {0,1,2,4,5,6} + RelNode join = + relBuilder() + .scan("projected_table_source_table_with_immutable_cols") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.INNER, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + assertEquals(ImmutableBitSet.of(0, 1, 2, 4, 5, 6), mq().getImmutableColumns(join)); + } + + @Test + public void testGetImmutableColumnsOnInnerJoinWhileOneSideNoUpsertKey() { + // Left: TableSourceTable3 → immutable & pk = null + // Right: projected_table_source_table_with_immutable_cols → {0,1,2} shifted by 7 → {7,8,9} + // Join has no upsert keys → guarded to null + RelNode join = + relBuilder() + .scan("TableSourceTable3") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.INNER, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + assertNull(mq().getImmutableColumns(join)); + + // Left: projected_table_source_table_with_immutable_cols → {0,1,2} + // Right: TableSourceTable3 → immutable & pk = null + // Join has no upsert keys → guarded to null + join = + relBuilder() + .scan("projected_table_source_table_with_immutable_cols") + .scan("TableSourceTable3") + .join( + JoinRelType.INNER, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + + assertNull(mq().getImmutableColumns(join)); + + // Left: TableSourceTable3 → immutable & pk = null + // Right: TableSourceTable3 → immutable & pk = null + // Join has no upsert keys → guarded to null + join = + relBuilder() + .scan("TableSourceTable3") + .scan("TableSourceTable3") + .join( + JoinRelType.INNER, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + + assertNull(mq().getImmutableColumns(join)); + } + + @Test + public void testGetImmutableColumnsOnLeftJoin() { + // LEFT JOIN: right side may produce nulls → ignore right immutable + // Left: projected_table_source_table_with_immutable_cols → {0,1,2} + // Result: {0,1,2} + RelNode join = + relBuilder() + .scan("projected_table_source_table_with_immutable_cols") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.LEFT, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(join)); + } + + @Test + public void testGetImmutableColumnsOnRightJoin() { + // RIGHT JOIN: left side may produce nulls → ignore left immutable + // Right: projected_table_source_table_with_immutable_cols → shifted by 4 → {4,5,6} + // Result: {4,5,6} + RelNode join = + relBuilder() + .scan("projected_table_source_table_with_immutable_cols") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.RIGHT, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + assertEquals(ImmutableBitSet.of(4, 5, 6), mq().getImmutableColumns(join)); + } + + @Test + public void testGetImmutableColumnsOnFullJoin() { + // FULL JOIN: both sides may produce nulls → both ignored → null + RelNode join = + relBuilder() + .scan("projected_table_source_table_with_immutable_cols") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.FULL, + relBuilder() + .call( + SqlStdOperatorTable.EQUALS, + relBuilder().field(2, 0, 0), + relBuilder().field(2, 1, 0))) + .build(); + assertNull(mq().getImmutableColumns(join)); + } + + // ------------------------------------------------------------------------------------- + // Lookup Join + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnLookupJoinWithImmutableCols() { + // Left: projected_table_source_table_with_immutable_cols → {0,1,2} + // Right projected_table_source_table_with_immutable_cols = {0,1,2}, ignored + // Result: {0,1,2} + TableScan src = tableWithImmutableColsStreamScan(); + StreamPhysicalLookupJoin lookupJoin = + getStreamLookupJoinsWithImmutableCols( + src, + src.getTable(), + JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)), + JoinRelType.INNER, + Option.empty()); + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(lookupJoin)); + + lookupJoin = + getStreamLookupJoinsWithImmutableCols( + src, + src.getTable(), + JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)), + JoinRelType.LEFT, + Option.empty()); + assertEquals(ImmutableBitSet.of(0, 1, 2), mq().getImmutableColumns(lookupJoin)); + + // join without lookup side's pk + // Result: null + lookupJoin = + getStreamLookupJoinsWithImmutableCols( + src, + src.getTable(), + JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(1)), + JoinRelType.LEFT, + Option.empty()); + assertNull(mq().getImmutableColumns(lookupJoin)); + } + + // ------------------------------------------------------------------------------------- + // Default (catch-all) + // ------------------------------------------------------------------------------------- + + @Test + public void testGetImmutableColumnsOnDefault() { + // TestRel has no specific handler → catch-all returns null + assertNull(mq().getImmutableColumns(testRel())); + } + + @Test + public void testGetImmutableColumnsOnValues() { + // LogicalValues has no specific handler → catch-all returns null + assertNull(mq().getImmutableColumns(logicalValues())); + assertNull(mq().getImmutableColumns(emptyValues())); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ImmutableColConstraintTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ImmutableColConstraintTestUtils.java new file mode 100644 index 00000000000..cdf9e6efcd0 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ImmutableColConstraintTestUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; +import org.apache.flink.table.catalog.ObjectPath; + +import java.util.List; + +/** + * Utils about {@link ImmutableColumnsConstraint} for tests. + * + * <p>This utils can be removed after we support syntax to define immutable columns constraint in + * DDL. + */ +public class ImmutableColConstraintTestUtils { + + public static void addImmutableColConstraint( + Catalog catalog, String databaseName, String tableName, String... immutableCols) + throws Exception { + ObjectPath tablePath = new ObjectPath(databaseName, tableName); + CatalogTable originalTable = (CatalogTable) catalog.getTable(tablePath); + catalog.dropTable(tablePath, false); + + Schema.UnresolvedImmutableColumns immutableColumns = + new Schema.UnresolvedImmutableColumns("imt", List.of(immutableCols)); + + Schema schema = originalTable.getUnresolvedSchema(); + schema = + new Schema( + schema.getColumns(), + schema.getWatermarkSpecs(), + schema.getPrimaryKey().orElse(null), + schema.getIndexes(), + immutableColumns); + + CatalogTable newTable = + CatalogTable.newBuilder() + .schema(schema) + .comment(originalTable.getComment()) + .partitionKeys(originalTable.getPartitionKeys()) + .options(originalTable.getOptions()) + .build(); + + catalog.createTable(tablePath, newTable, false); + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml index b79d0e2d3d6..eeebf3f02d8 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml @@ -708,6 +708,72 @@ Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, +- ChangelogNormalize(key=[id]) +- Exchange(distribution=[hash[id]]) +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinOutputUpsertKeyInSinkPkWhileJoinOnImmutableCols"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.sink], fields=[id, city_id, city_name, city_detail]) ++- LogicalProject(id=[$0], city_id=[$2], city_name=[$1], city_detail=[$4]) + +- LogicalJoin(condition=[=($1, $3)], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, source_city]]) + +- LogicalTableScan(table=[[default_catalog, default_database, source_city_detail]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, city_name, city_detail], conflictStrategy=[DEDUPLICATE], changelogMode=[NONE]) ++- Calc(select=[id, city_id, city_name, city_detail], changelogMode=[I,UA]) + +- Join(joinType=[InnerJoin], where=[=(city_name, city_name0)], select=[id, city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey], changelogMode=[I,UA]) + :- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA]) + : +- DropUpdateBefore(changelogMode=[I,UA]) + : +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UB,UA]) + +- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA]) + +- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, source_city_detail]], fields=[city_id, city_name, city_detail], changelogMode=[I,UB,UA]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, city_name, city_detail], conflictStrategy=[DEDUPLICATE]) ++- Calc(select=[id, city_id, city_name, city_detail]) + +- Join(joinType=[InnerJoin], where=[(city_name = city_name0)], select=[id, city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) + :- Exchange(distribution=[hash[city_name]]) + : +- DropUpdateBefore + : +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name]) + +- Exchange(distribution=[hash[city_name]]) + +- DropUpdateBefore + +- TableSourceScan(table=[[default_catalog, default_database, source_city_detail]], fields=[city_id, city_name, city_detail]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinOutputUpsertKeyInSinkPkWhileJoinOnPartOfImmutableCols"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.sink], fields=[id, city_id, city_name, city_detail]) ++- LogicalProject(id=[$0], city_id=[$3], city_name=[$1], city_detail=[$6]) + +- LogicalJoin(condition=[=($1, $4)], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, source_city]]) + +- LogicalTableScan(table=[[default_catalog, default_database, source_city_detail]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, city_name, city_detail], conflictStrategy=[DEDUPLICATE], changelogMode=[NONE]) ++- Calc(select=[id, city_id, city_name, city_detail], changelogMode=[I,UA]) + +- Join(joinType=[InnerJoin], where=[=(city_name, city_name0)], select=[id, city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey], changelogMode=[I,UA]) + :- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA]) + : +- DropUpdateBefore(changelogMode=[I,UA]) + : +- TableSourceScan(table=[[default_catalog, default_database, source_city, project=[id, city_name], metadata=[]]], fields=[id, city_name], changelogMode=[I,UB,UA]) + +- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA]) + +- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, source_city_detail, project=[city_id, city_name, city_detail], metadata=[]]], fields=[city_id, city_name, city_detail], changelogMode=[I,UB,UA]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, city_name, city_detail], conflictStrategy=[DEDUPLICATE]) ++- Calc(select=[id, city_id, city_name, city_detail]) + +- Join(joinType=[InnerJoin], where=[(city_name = city_name0)], select=[id, city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) + :- Exchange(distribution=[hash[city_name]]) + : +- DropUpdateBefore + : +- TableSourceScan(table=[[default_catalog, default_database, source_city, project=[id, city_name], metadata=[]]], fields=[id, city_name]) + +- Exchange(distribution=[hash[city_name]]) + +- DropUpdateBefore + +- TableSourceScan(table=[[default_catalog, default_database, source_city_detail, project=[city_id, city_name, city_detail], metadata=[]]], fields=[city_id, city_name, city_detail]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 3e8bf996630..88dd7214062 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -216,6 +216,15 @@ class FlinkRelMdHandlerTestBase { protected lazy val temporalTableStreamScan: StreamPhysicalDataStreamScan = createDataStreamScan(ImmutableList.of("TemporalTable4"), streamPhysicalTraits) + protected lazy val tableWithImmutableColsLogicalScan: LogicalTableScan = + createTableSourceTable( + ImmutableList.of("projected_table_source_table_with_immutable_cols"), + logicalTraits) + protected lazy val tableWithImmutableColsStreamScan: StreamPhysicalDataStreamScan = + createTableSourceTable( + ImmutableList.of("projected_table_source_table_with_immutable_cols"), + streamPhysicalTraits) + private lazy val valuesType = relBuilder.getTypeFactory .builder() .add("a", SqlTypeName.BIGINT) @@ -2694,6 +2703,25 @@ class FlinkRelMdHandlerTestBase { (batchLookupJoin, streamLookupJoin) } + protected def getStreamLookupJoinsWithImmutableCols( + leftInput: RelNode, + temporalTable: RelOptTable, + joinInfo: JoinInfo, + joinType: JoinRelType, + calcOnTemporalTable: Option[RexProgram]): StreamPhysicalLookupJoin = { + new StreamPhysicalLookupJoin( + cluster, + streamPhysicalTraits, + leftInput, + temporalTable, + calcOnTemporalTable, + joinInfo, + joinType, + Option.empty[RelHint], + false + ) + } + // select * from MyTable1 join MyTable4 on MyTable1.b = MyTable4.a protected lazy val logicalInnerJoinOnUniqueKeys: RelNode = relBuilder .scan("MyTable1") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala index 3b7524e1bd3..996ef4f2de8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala @@ -17,17 +17,25 @@ */ package org.apache.flink.table.planner.plan.metadata +import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.calcite.LogicalExpand -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregate +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalExchange, StreamPhysicalOverAggregate, StreamPhysicalRank, StreamPhysicalTableSourceScan} import org.apache.flink.table.planner.plan.schema.TableSourceTable -import org.apache.flink.table.planner.plan.utils.ExpandUtil +import org.apache.flink.table.planner.plan.utils.{ExpandUtil, RankProcessStrategy} +import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType} import com.google.common.collect.{ImmutableList, ImmutableSet} import org.apache.calcite.prepare.CalciteCatalogReader -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFieldImpl +import org.apache.calcite.rel.{RelCollations, RelFieldCollation, RelNode} +import org.apache.calcite.rel.core.{JoinRelType, Window} import org.apache.calcite.rel.hint.RelHint +import org.apache.calcite.rex.{RexInputRef, RexNode, RexWindowBounds} import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR -import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, LESS_THAN} +import org.apache.calcite.sql.SqlWindow +import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, LESS_THAN, MAX} +import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.util.ImmutableBitSet import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -477,6 +485,182 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase { assertEquals(toBitSet(Array(0)), mq.getUpsertKeys(intermediateScan).toSet) } + @Test + def testGetUpsertKeysOnTableScanWithImmutableCols(): Unit = { + // Immutable columns: {0, 1, 2} (PK 'a' + immutable 'c', 'd') + assertEquals( + toBitSet(Array(0), Array(0, 1, 2)), + mq.getUpsertKeys(tableWithImmutableColsLogicalScan).toSet) + } + + @Test + def testGetUpsertKeysOnExchangeWithImmutableCols(): Unit = { + // Hash exchange on column 1 (c, immutable) + val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true) + val exchange1 = new StreamPhysicalExchange( + cluster, + streamPhysicalTraits.replace(hash1), + tableWithImmutableColsStreamScan, + hash1) + assertEquals(toBitSet(Array(0), Array(0, 1, 2)), mq.getUpsertKeys(exchange1).toSet) + + // Hash exchange on column 3 (rowtime, NOT immutable) + val hash3 = FlinkRelDistribution.hash(Array(3), requireStrict = true) + val exchange3 = new StreamPhysicalExchange( + cluster, + streamPhysicalTraits.replace(hash3), + tableWithImmutableColsStreamScan, + hash3) + assertEquals(toBitSet(), mq.getUpsertKeys(exchange3).toSet) + } + + @Test + def testGetUpsertKeysOnSortWithImmutableCols(): Unit = { + // Sort on column 1 (c, immutable) + relBuilder.push(tableWithImmutableColsLogicalScan) + val sort1 = relBuilder.sort(relBuilder.field(1)).build() + assertEquals(toBitSet(Array(0), Array(0, 1, 2)), mq.getUpsertKeys(sort1).toSet) + + // Sort on column 3 (rowtime, NOT immutable) + relBuilder.push(tableWithImmutableColsLogicalScan) + val sort3 = relBuilder.sort(relBuilder.field(3)).build() + assertEquals(toBitSet(), mq.getUpsertKeys(sort3).toSet) + } + + @Test + def testGetUpsertKeysOnRankWithImmutableCols(): Unit = { + def buildRank(partitionKey: Int): RelNode = { + val hash = FlinkRelDistribution.hash(Array(partitionKey), requireStrict = true) + val exchange = new StreamPhysicalExchange( + cluster, + tableWithImmutableColsStreamScan.getTraitSet.replace(hash), + tableWithImmutableColsStreamScan, + hash) + new StreamPhysicalRank( + cluster, + streamPhysicalTraits, + exchange, + ImmutableBitSet.of(partitionKey), + RelCollations.of(2), + RankType.RANK, + new ConstantRankRange(1, 5), + new RelDataTypeFieldImpl("rk", 4, longType), + true, + RankProcessStrategy.UNDEFINED_STRATEGY, + false + ) + } + + // Rank partitioned by column 1 (c, immutable) + val rank1 = buildRank(1) + assertEquals(toBitSet(Array(0), Array(0, 1, 2)), mq.getUpsertKeys(rank1).toSet) + + // Rank partitioned by column 3 (rowtime, NOT immutable) + val rank3 = buildRank(3) + assertEquals(toBitSet(), mq.getUpsertKeys(rank3).toSet) + } + + @Test + def testGetUpsertKeysOnOverAggWithImmutableCols(): Unit = { + def buildOverAgg(partitionKey: Int): RelNode = { + val inputRowType = tableWithImmutableColsStreamScan.getRowType + val rowtimeType = inputRowType.getFieldList.get(3).getType + + val group = new Window.Group( + ImmutableBitSet.of(partitionKey), + true, + RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(0, 0)), null), + RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 0)), null), + RelCollations.of( + new RelFieldCollation( + 2, + RelFieldCollation.Direction.ASCENDING, + RelFieldCollation.NullDirection.FIRST)), + ImmutableList.of( + new Window.RexWinAggCall( + MAX, + rowtimeType, + ImmutableList.of[RexNode](new RexInputRef(3, rowtimeType)), + 0, + false, + false + ) + ) + ) + + val outputBuilder = typeFactory.builder() + inputRowType.getFieldList.forEach(f => outputBuilder.add(f.getName, f.getType)) + outputBuilder.add("max_rowtime", rowtimeType) + val outputRowType = outputBuilder.build() + + val logicalOverAgg = new FlinkLogicalOverAggregate( + cluster, + flinkLogicalTraits, + tableWithImmutableColsLogicalScan, + ImmutableList.of(), + outputRowType, + ImmutableList.of(group) + ) + + val hash = FlinkRelDistribution.hash(Array(partitionKey), requireStrict = true) + val exchange = new StreamPhysicalExchange( + cluster, + tableWithImmutableColsStreamScan.getTraitSet.replace(hash), + tableWithImmutableColsStreamScan, + hash) + + new StreamPhysicalOverAggregate( + cluster, + streamPhysicalTraits, + exchange, + outputRowType, + logicalOverAgg + ) + } + + // Over agg partitioned by column 1 (c, immutable) + val over1 = buildOverAgg(1) + assertEquals(toBitSet(Array(0), Array(0, 1, 2)), mq.getUpsertKeys(over1).toSet) + + // Over agg partitioned by column 3 (rowtime, NOT immutable) + val over3 = buildOverAgg(3) + assertEquals(toBitSet(), mq.getUpsertKeys(over3).toSet) + } + + @Test + def testGetUpsertKeysOnSemiAntiJoinWithImmutableCols(): Unit = { + // SEMI join on left.c(1) = right.c(1) + // Left upsert keys: {{0}, {0,1,2}}, left immutable: {0,1,2} + val join1 = relBuilder + .scan("projected_table_source_table_with_immutable_cols") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.SEMI, + relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1))) + .build() + assertEquals(toBitSet(Array(0), Array(0, 1, 2)), mq.getUpsertKeys(join1).toSet) + } + + @Test + def testGetUpsertKeysOnInnerJoinWithImmutableCols(): Unit = { + // Inner join on left.c(1) = right.c(1) + // Both sides: upsert keys = {{0}, {0,1,2}}, immutable = {0,1,2} + // filterKeys on both sides with join key {1}: both retain {{0}, {0,1,2}} + // Neither side is unique on {1}, so only concatenated keys survive + // Right shifted by 4: {{4}, {4,5,6}} + // Concat: {0}x{4}, {0}x{4,5,6}, {0,1,2}x{4}, {0,1,2}x{4,5,6} + val join = relBuilder + .scan("projected_table_source_table_with_immutable_cols") + .scan("projected_table_source_table_with_immutable_cols") + .join( + JoinRelType.INNER, + relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1))) + .build() + assertEquals( + toBitSet(Array(0, 4), Array(0, 4, 5, 6), Array(0, 1, 2, 4), Array(0, 1, 2, 4, 5, 6)), + mq.getUpsertKeys(join).toSet) + } + private def toBitSet(keys: Array[Int]*): Set[ImmutableBitSet] = { keys.map(k => ImmutableBitSet.of(k: _*)).toSet } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala index 5e2ffd7c58e..8f5f94572e5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala @@ -70,6 +70,9 @@ object MetadataTestUtil { rootSchema.add( "projected_table_source_table_with_partial_pk", createProjectedTableSourceTableWithPartialCompositePrimaryKey()) + rootSchema.add( + "projected_table_source_table_with_immutable_cols", + createProjectedTableSourceTableWithImmutableCols()) rootSchema } @@ -477,6 +480,46 @@ object MetadataTestUtil { flinkContext) } + private def createProjectedTableSourceTableWithImmutableCols(): Table = { + val resolvedSchema = new ResolvedSchema( + util.Arrays.asList( + Column.physical("a", DataTypes.BIGINT().notNull()), + Column.physical("b", DataTypes.INT().notNull()), + Column.physical("c", DataTypes.STRING().notNull()), + Column.physical("d", DataTypes.BIGINT().notNull()), + Column.physical("rowtime", DataTypes.TIMESTAMP(3)) + ), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a")), + Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + ImmutableColumnsConstraint.immutableColumns("imt", util.Arrays.asList("c", "d"))) + + val catalogTable = getCatalogTable(resolvedSchema) + + // projected: drop column b, keep a, c, d, rowtime + val typeFactory = new FlinkTypeFactory(Thread.currentThread().getContextClassLoader) + val rowType = typeFactory.buildRelNodeRowType( + Seq("a", "c", "d", "rowtime"), + Seq( + new BigIntType(false), + new VarCharType(false, 100), + new BigIntType(false), + new TimestampType(true, TimestampKind.ROWTIME, 3))) + + new MockTableSourceTable( + rowType, + new TestTableSource(), + true, + ContextResolvedTable.temporary( + ObjectIdentifier.of( + "default_catalog", + "default_database", + "projected_table_source_table_with_immutable_cols"), + new ResolvedCatalogTable(catalogTable, resolvedSchema) + ), + flinkContext) + } + private def getCatalogTable(resolvedSchema: ResolvedSchema) = { CatalogTable .newBuilder() @@ -573,4 +616,13 @@ class MockTableSourceTable( call: SqlCall, parent: SqlNode, config: CalciteConnectionConfig): Boolean = false + + def copy(newTableSource: DynamicTableSource, newRowType: RelDataType): MockTableSourceTable = { + new MockTableSourceTable( + newRowType, + newTableSource, + isStreamingMode, + contextResolvedTable, + flinkContext) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala index c4d8e7962c0..f1c72583924 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.stream.sql.join import org.apache.flink.table.api._ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableFunc1, TableTestBase} +import org.apache.flink.table.planner.utils.ImmutableColConstraintTestUtils.addImmutableColConstraint import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test @@ -510,6 +511,132 @@ class JoinTest extends TableTestBase { ) } + @Test + def testJoinOutputUpsertKeyInSinkPkWhileJoinOnImmutableCols(): Unit = { + val catalog = util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get() + + util.tableEnv.executeSql(""" + |create table source_city ( + | id varchar, + | city_name varchar, + | primary key (id) not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, "source_city", "city_name") + + util.tableEnv.executeSql(""" + |create table source_city_detail ( + | city_id varchar, + | city_name varchar, + | city_detail varchar, + | primary key (city_id) not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + addImmutableColConstraint( + catalog, + util.tableEnv.getCurrentDatabase, + "source_city_detail", + "city_name") + + util.tableEnv.executeSql(""" + |create table sink ( + | id varchar, + | city_id varchar, + | city_name varchar, + | city_detail varchar, + | primary key (id, city_id) not enforced + |) with ( + | 'connector' = 'values' + | ,'sink-insert-only' = 'false' + |) + |""".stripMargin) + + // verify UB should be dropped and no upsertMaterialize + util.verifyExplainInsert( + """ + |insert into sink + |select t1.id, t2.city_id, t1.city_name, t2.city_detail + | from source_city t1 + | join source_city_detail t2 on t1.city_name = t2.city_name + |on conflict do deduplicate + |""".stripMargin, + ExplainDetail.CHANGELOG_MODE + ) + } + + @Test + def testJoinOutputUpsertKeyInSinkPkWhileJoinOnPartOfImmutableCols(): Unit = { + val catalog = util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get() + + util.tableEnv.executeSql(""" + |create table source_city ( + | id varchar, + | city_name varchar, + | city_no int, + | primary key (id) not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + addImmutableColConstraint( + catalog, + util.tableEnv.getCurrentDatabase, + "source_city", + "city_name", + "city_no") + + util.tableEnv.executeSql(""" + |create table source_city_detail ( + | city_id varchar, + | city_name varchar, + | city_no int, + | city_detail varchar, + | primary key (city_id) not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + addImmutableColConstraint( + catalog, + util.tableEnv.getCurrentDatabase, + "source_city_detail", + "city_name", + "city_no") + + util.tableEnv.executeSql(""" + |create table sink ( + | id varchar, + | city_id varchar, + | city_name varchar, + | city_detail varchar, + | primary key (id, city_id) not enforced + |) with ( + | 'connector' = 'values' + | ,'sink-insert-only' = 'false' + |) + |""".stripMargin) + + // verify UB should be dropped and no upsertMaterialize on sink + util.verifyExplainInsert( + """ + |insert into sink + |select t1.id, t2.city_id, t1.city_name, t2.city_detail + | from source_city t1 + | join source_city_detail t2 on t1.city_name = t2.city_name + |on conflict do deduplicate + |""".stripMargin, + ExplainDetail.CHANGELOG_MODE + ) + } + @Test def testInnerJoinWithFilterPushDown(): Unit = { util.verifyExecPlan("""
