This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a62a144a5e9 [FLINK-39421][table] Fix metadata filter contract and add
coverage
a62a144a5e9 is described below
commit a62a144a5e99ef1d66007f2caf409a62fdbcb5ad
Author: Jim Hughes <[email protected]>
AuthorDate: Fri Jun 5 11:28:50 2026 -0400
[FLINK-39421][table] Fix metadata filter contract and add coverage
PushFilterIntoTableSourceScanRule now classifies each input predicate
by identity-Set membership in the source's accepted/remaining lists,
allowing arbitrary non-contiguous subsets. An input absent from both
raises a TableException.
Metadata-only predicates the rule cannot push (source does not support
it or the spec is already attached) now stay as a runtime Calc; the
prior physical route created a FilterPushDownSpec that crashed
compiled-plan restore once ProjectPushDownSpec narrowed the row type.
MetadataFilterPushDownSpec enforces the same identity round-trip on
restore; needAdjustFieldReferenceAfterProjection() returns false so
ScanReuser can share a scan across queries with the same metadata
filter but different projections.
Tests cover the four MetadataFilterResult shapes end-to-end, the
coverage invariant, and JSON serde of the spec.
---
.../plan/abilities/source/FilterPushDownSpec.java | 21 +-
.../source/MetadataFilterPushDownSpec.java | 59 +++-
.../logical/PushFilterIntoSourceScanRuleBase.java | 79 ++++--
.../logical/PushFilterIntoTableSourceScanRule.java | 52 ++--
.../planner/factories/TestValuesTableFactory.java | 1 +
.../MetadataFilterInReadingMetadataTest.java | 264 +++++++++++-------
.../logical/MetadataFilterResultShapesITCase.java | 305 +++++++++++++++++++++
.../MetadataFilterInReadingMetadataTest.xml | 52 +++-
8 files changed, 660 insertions(+), 173 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 7a4e1481e04..108329cea04 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -63,7 +63,7 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
/**
* A flag which indicates all predicates are retained in the outer Filter
operator.
*
- * <p>This flog is only used for optimization phase, and should not be
serialized.
+ * <p>This flag is only used for optimization phase, and should not be
serialized.
*/
@JsonIgnore private final boolean allPredicatesRetained;
@@ -111,8 +111,9 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
}
/**
- * Converts {@link RexNode} predicates to {@link ResolvedExpression}s
using the given row type.
- * Shared between physical and metadata filter push-down paths.
+ * Converts {@link RexNode} predicates to {@link ResolvedExpression}s,
preserving 1:1 input
+ * order so callers can correlate by position. Shared between physical and
metadata filter
+ * push-down paths.
*/
static List<ResolvedExpression> resolvePredicates(
List<RexNode> predicates,
@@ -137,9 +138,9 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
throw new TableException(
String.format(
"%s can not be
converted to Expression, please make sure %s can accept %s.",
- p.toString(),
+ p,
tableSource.getClass().getSimpleName(),
- p.toString()));
+ p));
}
})
.collect(Collectors.toList());
@@ -160,7 +161,15 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
"SQL expression parsing is not
supported at this location.");
})
.build();
- return resolver.resolve(filters);
+ List<ResolvedExpression> resolved = resolver.resolve(filters);
+ if (resolved.size() != predicates.size()) {
+ throw new TableException(
+ String.format(
+ "Internal error: ExpressionResolver returned %d
resolved expressions "
+ + "for %d input predicates.",
+ resolved.size(), predicates.size()));
+ }
+ return resolved;
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
index 090ab45fe27..d1cb3303b80 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -37,6 +38,7 @@ import org.apache.calcite.rex.RexNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -78,23 +80,57 @@ public final class MetadataFilterPushDownSpec extends
SourceAbilitySpecBase {
@Override
public void apply(DynamicTableSource tableSource, SourceAbilityContext
context) {
- // Use stored predicateRowType; context's row type may be narrowed by
ProjectPushDownSpec.
- MetadataFilterResult result =
- applyMetadataFilters(predicates, predicateRowType,
tableSource, context);
- if (result.getAcceptedFilters().size() != predicates.size()) {
- throw new TableException("All metadata predicates should be
accepted here.");
+ List<ResolvedExpression> resolved =
+ resolvedExpressions(predicates, predicateRowType, tableSource,
context);
+ MetadataFilterResult result =
applyMetadataFiltersOnSource(tableSource, resolved);
+ // On restore every predicate must round-trip back via instance
identity. `remaining`
+ // is not validated: the spec only stores already-accepted predicates,
so the source
+ // should re-accept them all.
+ Set<ResolvedExpression> accepted = Sets.newIdentityHashSet();
+ accepted.addAll(result.getAcceptedFilters());
+ Set<ResolvedExpression> inputs = Sets.newIdentityHashSet();
+ inputs.addAll(resolved);
+ for (ResolvedExpression r : result.getAcceptedFilters()) {
+ if (!inputs.contains(r)) {
+ throw new TableException(
+ "Source returned an accepted metadata filter not
produced by the "
+ + "planner. Sources must return back the same
ResolvedExpression "
+ + "instances they received.");
+ }
+ }
+ for (ResolvedExpression r : resolved) {
+ if (!accepted.contains(r)) {
+ throw new TableException(
+ "All metadata predicates should be accepted on
compiled-plan restore. "
+ + "Source dropped a predicate that was
accepted at optimization "
+ + "time.");
+ }
}
}
/**
- * Converts RexNode predicates to ResolvedExpressions using the given row
type and calls
- * applyMetadataFilters on the source. The row type must already use
metadata key names.
+ * Resolves predicates to {@link ResolvedExpression}s; the returned list
preserves input order,
+ * so callers may correlate against the input list by position.
*/
- public static MetadataFilterResult applyMetadataFilters(
+ public static List<ResolvedExpression> resolvedExpressions(
List<RexNode> predicates,
RowType metadataKeyRowType,
DynamicTableSource tableSource,
SourceAbilityContext context) {
+ ensureMetadataFilterPushDown(tableSource);
+ return FilterPushDownSpec.resolvePredicates(
+ predicates, metadataKeyRowType, tableSource, context);
+ }
+
+ /** Pushes already-resolved expressions to the source. */
+ public static MetadataFilterResult applyMetadataFiltersOnSource(
+ DynamicTableSource tableSource, List<ResolvedExpression> resolved)
{
+ SupportsReadingMetadata readingMetadata =
ensureMetadataFilterPushDown(tableSource);
+ return readingMetadata.applyMetadataFilters(resolved);
+ }
+
+ private static SupportsReadingMetadata ensureMetadataFilterPushDown(
+ DynamicTableSource tableSource) {
if (!(tableSource instanceof SupportsReadingMetadata)) {
throw new TableException(
String.format(
@@ -108,15 +144,12 @@ public final class MetadataFilterPushDownSpec extends
SourceAbilitySpecBase {
"%s no longer supports metadata filter push-down.",
tableSource.getClass().getName()));
}
- List<ResolvedExpression> resolved =
- FilterPushDownSpec.resolvePredicates(
- predicates, metadataKeyRowType, tableSource, context);
- return readingMetadata.applyMetadataFilters(resolved);
+ return readingMetadata;
}
@Override
public boolean needAdjustFieldReferenceAfterProjection() {
- return true;
+ return false;
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index eb5521ef782..ef8bc092f98 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -17,6 +17,7 @@
package org.apache.flink.table.planner.plan.rules.logical;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -33,6 +34,9 @@ import
org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
+
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.core.TableScan;
@@ -47,6 +51,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -176,36 +181,78 @@ public abstract class PushFilterIntoSourceScanRuleBase
extends RelOptRule {
return mapping;
}
+ /** Outcome of metadata filter push-down: updated table and runtime-Calc
predicates. */
+ protected static final class MetadataPushDownOutcome {
+ final TableSourceTable newTableSourceTable;
+ final List<RexNode> remainingInputRexNodes;
+
+ MetadataPushDownOutcome(
+ TableSourceTable newTableSourceTable, List<RexNode>
remainingInputRexNodes) {
+ this.newTableSourceTable = newTableSourceTable;
+ this.remainingInputRexNodes = remainingInputRexNodes;
+ }
+ }
+
/** Resolves metadata filters and creates a new {@link TableSourceTable}.
*/
- protected Tuple2<MetadataFilterResult, TableSourceTable>
- resolveMetadataFiltersAndCreateTableSourceTable(
- RexNode[] metadataPredicates,
- TableSourceTable oldTableSourceTable,
- TableScan scan,
- RelBuilder relBuilder) {
+ protected MetadataPushDownOutcome
resolveMetadataFiltersAndCreateTableSourceTable(
+ RexNode[] metadataPredicates, TableSourceTable
oldTableSourceTable, TableScan scan) {
DynamicTableSource newTableSource =
oldTableSourceTable.tableSource().copy();
SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);
- // Build a metadata-only row type (field names are metadata keys, not
SQL aliases) and
- // an old->new index mapping. Storing only metadata columns avoids
name collisions with
- // physical columns (e.g. `offset INT, msg_offset INT METADATA FROM
'offset'`).
+ // Field names are metadata keys, not SQL aliases, to avoid
physical/metadata collisions
+ // (e.g. `offset INT, msg_offset INT METADATA FROM 'offset'`).
MetadataRowInfo metadataRowInfo =
buildMetadataRowInfo(oldTableSourceTable,
abilityContext.getSourceRowType());
RexNode[] remappedPredicates =
remapPredicates(metadataPredicates,
metadataRowInfo.oldIndexToNewIndex);
- MetadataFilterResult result =
- MetadataFilterPushDownSpec.applyMetadataFilters(
+ List<ResolvedExpression> resolved =
+ MetadataFilterPushDownSpec.resolvedExpressions(
Arrays.asList(remappedPredicates),
metadataRowInfo.metadataRowType,
newTableSource,
abilityContext);
+ MetadataFilterResult result =
+
MetadataFilterPushDownSpec.applyMetadataFiltersOnSource(newTableSource,
resolved);
+
+ // Source must return back the same instances it received.
+ Set<ResolvedExpression> acceptedSet = Sets.newIdentityHashSet();
+ acceptedSet.addAll(result.getAcceptedFilters());
+ Set<ResolvedExpression> remainingSet = Sets.newIdentityHashSet();
+ remainingSet.addAll(result.getRemainingFilters());
+ Set<ResolvedExpression> inputSet = Sets.newIdentityHashSet();
+ inputSet.addAll(resolved);
+
+ for (ResolvedExpression r :
+ Iterables.concat(result.getAcceptedFilters(),
result.getRemainingFilters())) {
+ if (!inputSet.contains(r)) {
+ throw new TableException(
+ "Source returned a metadata filter not in the input
list. Sources must "
+ + "return back the same ResolvedExpression
instances they "
+ + "received from applyMetadataFilters.");
+ }
+ }
- int acceptedCount = result.getAcceptedFilters().size();
List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
- for (int i = 0; i < acceptedCount; i++) {
- acceptedRemappedPredicates.add(remappedPredicates[i]);
+ List<RexNode> remainingInputRexNodes = new ArrayList<>();
+ for (int i = 0; i < resolved.size(); i++) {
+ ResolvedExpression r = resolved.get(i);
+ boolean inAccepted = acceptedSet.contains(r);
+ boolean inRemaining = remainingSet.contains(r);
+ if (!inAccepted && !inRemaining) {
+ throw new TableException(
+ "Source dropped a metadata filter that was passed to "
+ + "applyMetadataFilters. Every input predicate
must appear in the "
+ + "result's accepted list, remaining list, or
both.");
+ }
+ if (inAccepted) {
+ acceptedRemappedPredicates.add(remappedPredicates[i]);
+ }
+ if (inRemaining) {
+ remainingInputRexNodes.add(metadataPredicates[i]);
+ }
}
+
MetadataFilterPushDownSpec metadataSpec =
new MetadataFilterPushDownSpec(
acceptedRemappedPredicates,
metadataRowInfo.metadataRowType);
@@ -216,7 +263,7 @@ public abstract class PushFilterIntoSourceScanRuleBase
extends RelOptRule {
oldTableSourceTable.getStatistic(),
new SourceAbilitySpec[] {metadataSpec});
- return new Tuple2<>(result, newTableSourceTable);
+ return new MetadataPushDownOutcome(newTableSourceTable,
remainingInputRexNodes);
}
/**
@@ -253,7 +300,7 @@ public abstract class PushFilterIntoSourceScanRuleBase
extends RelOptRule {
public RexNode visitInputRef(RexInputRef inputRef) {
Integer newIdx =
oldIndexToNewIndex.get(inputRef.getIndex());
if (newIdx == null) {
- throw new IllegalStateException(
+ throw new TableException(
"Metadata predicate references
non-metadata column index "
+ inputRef.getIndex());
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 57d80b1b2f9..4e896a30701 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.rules.logical;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
-import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
@@ -31,6 +30,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import scala.Tuple2;
@@ -99,23 +99,31 @@ public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanR
boolean supportsMetadataFilter =
canPushdownMetadataFilter(tableSourceTable);
int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
- // Classify predicates: only separate metadata predicates when the
source
- // actually supports metadata filter push-down. Otherwise, all
predicates
- // go through the physical path to preserve the FilterPushDownSpec
guard
- // that prevents rule re-firing and maintains scan reuse invariants.
+ List<RexNode> allRemainingRexNodes = new ArrayList<>();
+ TableSourceTable currentTable = tableSourceTable;
+
+ // Unpushable metadata predicates stay as a runtime Calc, not the
physical path —
+ // physical routing produces a FilterPushDownSpec that crashes
compiled-plan restore
+ // once ProjectPushDownSpec narrows the scan row type.
List<RexNode> physicalPredicates = new ArrayList<>();
List<RexNode> metadataPredicates = new ArrayList<>();
for (RexNode predicate : convertiblePredicates) {
- if (supportsMetadataFilter
- && referencesOnlyMetadataColumns(predicate,
physicalColumnCount)) {
- metadataPredicates.add(predicate);
+ if (referencesOnlyMetadataColumns(predicate, physicalColumnCount))
{
+ if (supportsMetadataFilter) {
+ metadataPredicates.add(predicate);
+ } else {
+ allRemainingRexNodes.add(predicate);
+ }
} else {
physicalPredicates.add(predicate);
}
}
- List<RexNode> allRemainingRexNodes = new ArrayList<>();
- TableSourceTable currentTable = tableSourceTable;
+ // Avoid re-firing on shapes we can't transform — saves wasted Hep
iterations.
+ boolean nothingToPushPhysically = physicalPredicates.isEmpty() ||
!supportsPhysicalFilter;
+ if (nothingToPushPhysically && metadataPredicates.isEmpty()) {
+ return;
+ }
if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
Tuple2<SupportsFilterPushDown.Result, TableSourceTable>
physicalResult =
@@ -133,28 +141,14 @@ public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanR
}
if (!metadataPredicates.isEmpty()) {
- Tuple2<MetadataFilterResult, TableSourceTable> metadataResult =
+ MetadataPushDownOutcome metadataResult =
resolveMetadataFiltersAndCreateTableSourceTable(
- metadataPredicates.toArray(new RexNode[0]),
- currentTable,
- scan,
- relBuilder);
- currentTable = metadataResult._2;
- // Remaining (rejected) metadata predicates stay as a
LogicalFilter above
- // the scan so they are still evaluated at runtime. We use the
original
- // RexNodes (suffix) because the remaining ResolvedExpressions use
metadata
- // key names, not SQL aliases needed by the Filter's row type. The
- // validation in resolveMetadataFiltersAndCreateTableSourceTable
ensures
- // the partition invariant (accepted prefix + remaining suffix =
input).
- int acceptedCount = metadataResult._1.getAcceptedFilters().size();
- for (int i = acceptedCount; i < metadataPredicates.size(); i++) {
- allRemainingRexNodes.add(metadataPredicates.get(i));
- }
+ metadataPredicates.toArray(new RexNode[0]),
currentTable, scan);
+ currentTable = metadataResult.newTableSourceTable;
+ allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes);
}
- for (RexNode unconverted : unconvertedPredicates) {
- allRemainingRexNodes.add(unconverted);
- }
+ allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates));
LogicalTableScan newScan =
LogicalTableScan.create(scan.getCluster(), currentTable,
scan.getHints());
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index d011687d82e..3387f522232 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -1847,6 +1847,7 @@ public final class TestValuesTableFactory
projectedMetadataFields,
enableAggregatePushDown);
newSource.watermarkStrategy = watermarkStrategy;
+
newSource.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
return newSource;
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
index ffd6aa0ea89..97245fbcbf1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.rules.logical;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableException;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
@@ -35,33 +36,28 @@ import
org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.testutils.junit.SharedObjectsExtension;
-import org.apache.flink.testutils.junit.SharedReference;
import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.tools.RuleSets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for metadata filter push-down through {@link
SupportsReadingMetadata}. */
class MetadataFilterInReadingMetadataTest extends TableTestBase {
- @RegisterExtension
- private final SharedObjectsExtension sharedObjects =
SharedObjectsExtension.create();
-
private BatchTableTestUtil util;
@BeforeEach
@@ -88,139 +84,144 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
@Test
void testMetadataFilterPushDown() {
- SharedReference<List<ResolvedExpression>> receivedFilters =
- sharedObjects.add(new ArrayList<>());
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(MetadataFilterSource.SCHEMA)
- .source(new MetadataFilterSource(true,
receivedFilters))
+ .source(new MetadataFilterSource(true))
.build();
util.tableEnv().createTable("T1", descriptor);
util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
-
- assertThat(receivedFilters.get().toString())
- .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
}
@Test
void testMetadataFilterNotPushedWhenNotSupported() {
- SharedReference<List<ResolvedExpression>> receivedFilters =
- sharedObjects.add(new ArrayList<>());
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(MetadataFilterSource.SCHEMA)
- .source(new MetadataFilterSource(false,
receivedFilters))
+ .source(new MetadataFilterSource(false))
.build();
util.tableEnv().createTable("T2", descriptor);
util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
-
- // No metadata filters should have been pushed
- assertThat(receivedFilters.get()).isEmpty();
}
@Test
void testAliasedMetadataColumnFilter() {
- SharedReference<List<ResolvedExpression>> receivedFilters =
- sharedObjects.add(new ArrayList<>());
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(RenamedMetadataFilterSource.SCHEMA)
- .source(new
RenamedMetadataFilterSource(receivedFilters))
+ .source(new RenamedMetadataFilterSource())
.build();
util.tableEnv().createTable("T3", descriptor);
- // 'event_ts' is the SQL alias for metadata key 'timestamp'
+ // 'event_ts' is the SQL alias for metadata key 'timestamp'; the spec
stores the metadata
+ // key so the plan shows `metadataFilter=[>(timestamp, ...)]` not the
alias.
util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP
'2024-01-01 00:00:00'");
-
- // The source should receive the filter with metadata key 'timestamp',
not 'event_ts'.
- assertThat(receivedFilters.get().toString())
- .isEqualTo("[greaterThan(timestamp, 2024-01-01T00:00)]");
}
@Test
void testMixedPhysicalAndMetadataFilters() {
- SharedReference<List<ResolvedExpression>> metadataFilters =
- sharedObjects.add(new ArrayList<>());
- SharedReference<List<ResolvedExpression>> physicalFilters =
- sharedObjects.add(new ArrayList<>());
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(MixedFilterSource.SCHEMA)
- .source(new MixedFilterSource(metadataFilters,
physicalFilters))
+ .source(new MixedFilterSource())
.build();
util.tableEnv().createTable("T4", descriptor);
+ // id > 10 → physical path, event_time > ... → metadata path.
util.verifyRelPlan(
"SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP
'2024-01-01 00:00:00'");
-
- // Verify routing: id > 10 → physical path, event_time > ... →
metadata path.
-
assertThat(physicalFilters.get().toString()).isEqualTo("[greaterThan(id, 10)]");
- assertThat(metadataFilters.get().toString())
- .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
}
@Test
void testPartialMetadataFilterAcceptance() {
- SharedReference<List<ResolvedExpression>> receivedFilters =
- sharedObjects.add(new ArrayList<>());
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(PartialMetadataFilterSource.SCHEMA)
- .source(new
PartialMetadataFilterSource(receivedFilters))
+ .source(new PartialMetadataFilterSource())
.build();
util.tableEnv().createTable("T6", descriptor);
- // Two metadata filters: the source accepts only the first one
+ // Source accepts the first filter and rejects the second.
util.verifyRelPlan(
"SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01
00:00:00'"
+ " AND priority > 5");
-
- // Source receives both filters; the XML reference verifies only the
first is accepted
- // (the second remains as a LogicalFilter above the scan).
- assertThat(receivedFilters.get().toString())
- .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00),
greaterThan(priority, 5)]");
}
@Test
void testPhysicalAndMetadataNameCollision() {
- // Physical column 'offset' shares a name with the metadata key
'offset'
- // (aliased in SQL as 'msg_offset'). The predicate on the metadata
column
- // must be pushed down using the metadata key, not confused with the
- // physical column of the same name.
- SharedReference<List<ResolvedExpression>> receivedFilters =
- sharedObjects.add(new ArrayList<>());
+ // Physical column 'offset' shares a name with metadata key 'offset'
(aliased to
+ // 'msg_offset'). The predicate must push down using the metadata key,
not the alias.
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(CollidingNameSource.SCHEMA)
- .source(new CollidingNameSource(receivedFilters))
+ .source(new CollidingNameSource())
.build();
util.tableEnv().createTable("T7", descriptor);
util.verifyRelPlan("SELECT id FROM T7 WHERE msg_offset > 5");
+ }
+
+ @Test
+ void testBestEffortMetadataPruning() {
+ // Source puts every predicate in both accepted and remaining; plan
shows both paths.
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new BestEffortPruningSource())
+ .build();
+ util.tableEnv().createTable("T8", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T8 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
+ }
+
+ @Test
+ void testNonContiguousSubsetAcceptance() {
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(NonContiguousAcceptingSource.SCHEMA)
+ .source(new NonContiguousAcceptingSource())
+ .build();
+ util.tableEnv().createTable("T9", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T9 WHERE m0 > 0 AND m1 > 1 AND m2 >
2");
+ }
- // Must reference the metadata key 'offset', NOT the SQL alias
'msg_offset'.
-
assertThat(receivedFilters.get().toString()).isEqualTo("[greaterThan(offset,
5)]");
+ @Test
+ void testCoverageInvariantWhenSourceDropsPredicate() {
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new DroppingAllSource())
+ .build();
+ util.tableEnv().createTable("TDrop", descriptor);
+
+ assertThatThrownBy(
+ () ->
+ util.tableEnv()
+ .explainSql(
+ "SELECT id FROM TDrop "
+ + "WHERE event_time >
TIMESTAMP '2024-01-01 00:00:00'"))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "Source dropped a metadata filter that was passed to "
+ + "applyMetadataFilters. Every input predicate
must appear in the "
+ + "result's accepted list, remaining list, or
both.");
}
@Test
void testMetadataFilterWithProjection() {
- SharedReference<List<ResolvedExpression>> receivedFilters =
- sharedObjects.add(new ArrayList<>());
TableDescriptor descriptor =
TableFactoryHarness.newBuilder()
.schema(MetadataFilterSource.SCHEMA)
- .source(new MetadataFilterSource(true,
receivedFilters))
+ .source(new MetadataFilterSource(true))
.build();
util.tableEnv().createTable("T5", descriptor);
+ // Projection push-down must not perturb the metadata filter.
util.verifyRelPlan(
"SELECT id, name FROM T5 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
-
- // Projection push-down must not perturb the metadata filter.
- assertThat(receivedFilters.get().toString())
- .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
}
//
-----------------------------------------------------------------------------------------
@@ -239,13 +240,9 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
.build();
private final boolean supportsMetadataFilter;
- private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
- MetadataFilterSource(
- boolean supportsMetadataFilter,
- SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
+ MetadataFilterSource(boolean supportsMetadataFilter) {
this.supportsMetadataFilter = supportsMetadataFilter;
- this.receivedMetadataFilters = receivedMetadataFilters;
}
@Override
@@ -265,11 +262,36 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
@Override
public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
- receivedMetadataFilters.get().addAll(metadataFilters);
return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
}
}
+ /** Returns each input in both accepted and remaining (best-effort pruning
shape). */
+ private static class BestEffortPruningSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("event_time", TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ // Best-effort: accepted = remaining = all.
+ return MetadataFilterResult.of(metadataFilters, metadataFilters);
+ }
+ }
+
/** Tests key translation when SQL alias differs from metadata key. */
private static class RenamedMetadataFilterSource extends
TableFactoryHarness.ScanSourceBase
implements SupportsReadingMetadata {
@@ -280,13 +302,6 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
.columnByMetadata("event_ts", TIMESTAMP(3),
"timestamp")
.build();
- private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
-
- RenamedMetadataFilterSource(
- SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
- this.receivedMetadataFilters = receivedMetadataFilters;
- }
-
@Override
public Map<String, DataType> listReadableMetadata() {
Map<String, DataType> metadata = new HashMap<>();
@@ -304,7 +319,6 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
@Override
public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
- receivedMetadataFilters.get().addAll(metadataFilters);
return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
}
}
@@ -320,16 +334,9 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
.columnByMetadata("priority", INT())
.build();
- private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
-
- PartialMetadataFilterSource(
- SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
- this.receivedMetadataFilters = receivedMetadataFilters;
- }
-
@Override
public Map<String, DataType> listReadableMetadata() {
- Map<String, DataType> metadata = new HashMap<>();
+ Map<String, DataType> metadata = new LinkedHashMap<>();
metadata.put("event_time", TIMESTAMP(3));
metadata.put("priority", INT());
return metadata;
@@ -345,8 +352,6 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
@Override
public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
- receivedMetadataFilters.get().addAll(metadataFilters);
- // Accept only the first filter
List<ResolvedExpression> accepted =
metadataFilters.isEmpty()
? Collections.emptyList()
@@ -370,16 +375,6 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
.columnByMetadata("event_time", TIMESTAMP(3))
.build();
- private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
- private final SharedReference<List<ResolvedExpression>>
receivedPhysicalFilters;
-
- MixedFilterSource(
- SharedReference<List<ResolvedExpression>>
receivedMetadataFilters,
- SharedReference<List<ResolvedExpression>>
receivedPhysicalFilters) {
- this.receivedMetadataFilters = receivedMetadataFilters;
- this.receivedPhysicalFilters = receivedPhysicalFilters;
- }
-
@Override
public Map<String, DataType> listReadableMetadata() {
Map<String, DataType> metadata = new HashMap<>();
@@ -397,13 +392,11 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
@Override
public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
- receivedMetadataFilters.get().addAll(metadataFilters);
return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
}
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
- receivedPhysicalFilters.get().addAll(filters);
return Result.of(filters, Collections.emptyList());
}
}
@@ -422,16 +415,80 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
.columnByMetadata("msg_offset", INT(), "offset")
.build();
- private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("offset", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
- CollidingNameSource(SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
- this.receivedMetadataFilters = receivedMetadataFilters;
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
}
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
+ }
+ }
+
+ /** Accepts inputs at positions 0 and 2, rejects position 1. */
+ private static class NonContiguousAcceptingSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("m0", INT())
+ .columnByMetadata("m1", INT())
+ .columnByMetadata("m2", INT())
+ .build();
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new LinkedHashMap<>();
+ metadata.put("m0", INT());
+ metadata.put("m1", INT());
+ metadata.put("m2", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ // Accept inputs at positions 0 and 2; reject position 1.
+ List<ResolvedExpression> accepted = new ArrayList<>();
+ List<ResolvedExpression> remaining = new ArrayList<>();
+ for (int i = 0; i < metadataFilters.size(); i++) {
+ if (i == 1) {
+ remaining.add(metadataFilters.get(i));
+ } else {
+ accepted.add(metadataFilters.get(i));
+ }
+ }
+ return MetadataFilterResult.of(accepted, remaining);
+ }
+ }
+
+ /** Drops every input by returning empty accepted and empty remaining. */
+ private static class DroppingAllSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
@Override
public Map<String, DataType> listReadableMetadata() {
Map<String, DataType> metadata = new HashMap<>();
- metadata.put("offset", INT());
+ metadata.put("event_time", TIMESTAMP(3));
return metadata;
}
@@ -445,8 +502,7 @@ class MetadataFilterInReadingMetadataTest extends
TableTestBase {
@Override
public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
- receivedMetadataFilters.get().addAll(metadataFilters);
- return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
+ return MetadataFilterResult.of(Collections.emptyList(),
Collections.emptyList());
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
new file mode 100644
index 00000000000..47a987d485c
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests covering the four shapes a connector can return from
{@link
+ * SupportsReadingMetadata#applyMetadataFilters} (accept-all, accept-none,
partial accept,
+ * best-effort overlap).
+ *
+ * <p>For each shape this test asserts both the runtime result (the Calc above
the scan must
+ * evaluate any {@code remaining} predicates) and the optimized plan (the scan
carries the {@code
+ * accepted} set, the {@code LogicalFilter} above carries the {@code
remaining} set).
+ */
+class MetadataFilterResultShapesITCase {
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(2)
+ .build());
+
+ /** Schema: {@code id INT, m0 INT METADATA, m1 INT METADATA}. */
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("m0", INT())
+ .columnByMetadata("m1", INT())
+ .build();
+
+ /**
+ * Six rows that exercise both predicates {@code m0 > 0} and {@code m1 >
0}. The data covers all
+ * four cells of the Cartesian product (both pass / only m0 passes / only
m1 passes / neither).
+ */
+ private static final List<Row> ROWS =
+ Arrays.asList(
+ Row.of(1, 5, 5), // both pass
+ Row.of(2, 5, -1), // only m0 passes
+ Row.of(3, -1, 5), // only m1 passes
+ Row.of(4, -1, -1), // neither passes
+ Row.of(5, 7, 9), // both pass
+ Row.of(6, 0, 0)); // neither passes (predicates are strict
>)
+
+ private static final String SQL = "SELECT id FROM %s WHERE m0 > 0 AND m1 >
0 ORDER BY id";
+
+ /** Rows that satisfy {@code m0 > 0 AND m1 > 0}: only id=1 and id=5. */
+ private static final List<Integer> EXPECTED_FILTERED_IDS =
Arrays.asList(1, 5);
+
+ private TableEnvironment tableEnv;
+
+ @BeforeEach
+ void setup() {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ tableEnv = TableEnvironment.create(settings);
+ }
+
+ /** Row passes both predicates {@code m0 > 0 AND m1 > 0}. */
+ private static final Predicate<Row> PASS_BOTH =
+ row -> ((Integer) row.getField(1)) > 0 && ((Integer)
row.getField(2)) > 0;
+
+ /** Row passes only the first predicate {@code m0 > 0}. */
+ private static final Predicate<Row> PASS_M0 = row -> ((Integer)
row.getField(1)) > 0;
+
+ /** Row passes no predicate (source emits everything). */
+ private static final Predicate<Row> PASS_NONE = row -> true;
+
+ // Plan-string assertions below use two renderers: `metadataFilter=[...]`
(lowercase `and`,
+ // from RexNode toString) and `where=[...]` (uppercase `AND`, from
RelExplainable).
+
+ @Test
+ void testAcceptAll() throws Exception {
+ // Source claims it can apply both predicates and does so by emitting
only matching rows.
+ registerTable("T_ACCEPT_ALL", ConfigurableMetadataSource.ACCEPT_ALL,
PASS_BOTH);
+
+ assertThat(collectIds(String.format(SQL, "T_ACCEPT_ALL")))
+ .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+ String explain = tableEnv.explainSql(String.format(SQL,
"T_ACCEPT_ALL"));
+ // Both predicates pushed onto the scan as the conjunction.
+ assertThat(explain).contains("metadataFilter=[and(>(m0, 0), >(m1,
0))]");
+ // No runtime Calc with a where on metadata: accepted = inputs,
remaining = empty.
+ assertThat(explain).doesNotContain("where=[((m0 > 0) AND (m1 > 0))]");
+ assertThat(explain).doesNotContain("where=[(m0 > 0)]");
+ assertThat(explain).doesNotContain("where=[(m1 > 0)]");
+ }
+
+ @Test
+ void testAcceptNone() throws Exception {
+ // Source rejects everything: emit all rows; runtime Calc must do all
the filtering.
+ registerTable("T_ACCEPT_NONE", ConfigurableMetadataSource.ACCEPT_NONE,
PASS_NONE);
+
+ assertThat(collectIds(String.format(SQL, "T_ACCEPT_NONE")))
+ .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+ String explain = tableEnv.explainSql(String.format(SQL,
"T_ACCEPT_NONE"));
+ // Empty accepted set; full conjunction retained on the runtime Calc.
+ assertThat(explain).contains("metadataFilter=[]");
+ assertThat(explain).contains("where=[AND(>(m0, 0), >(m1, 0))]");
+ // Nothing pushed besides the empty marker.
+ assertThat(explain).doesNotContain("metadataFilter=[>(");
+ assertThat(explain).doesNotContain("metadataFilter=[and(");
+ }
+
+ @Test
+ void testAcceptFirstOnly() throws Exception {
+ // Source applies only m0 > 0 itself; runtime Calc must apply m1 > 0.
+ registerTable("T_ACCEPT_FIRST",
ConfigurableMetadataSource.ACCEPT_FIRST_ONLY, PASS_M0);
+
+ assertThat(collectIds(String.format(SQL, "T_ACCEPT_FIRST")))
+ .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+ String explain = tableEnv.explainSql(String.format(SQL,
"T_ACCEPT_FIRST"));
+ // First predicate (m0 > 0) is pushed; second (m1 > 0) stays on the
runtime Calc.
+ assertThat(explain).contains("metadataFilter=[>(m0, 0)]");
+ assertThat(explain).contains("where=[>(m1, 0)]");
+ assertThat(explain).doesNotContain("metadataFilter=[>(m1, 0)]");
+ assertThat(explain).doesNotContain("metadataFilter=[and(");
+ }
+
+ @Test
+ void testBestEffortOverlap() throws Exception {
+ // Source claims both for storage-side pruning AND remains them:
runtime must re-apply.
+ // Source emits all rows so the runtime Calc is the load-bearing
filter; correctness must
+ // not depend on the source's claim.
+ registerTable(
+ "T_BEST_EFFORT",
ConfigurableMetadataSource.BEST_EFFORT_BOTH_AND_REMAIN, PASS_NONE);
+
+ assertThat(collectIds(String.format(SQL, "T_BEST_EFFORT")))
+ .containsExactlyElementsOf(EXPECTED_FILTERED_IDS);
+
+ String explain = tableEnv.explainSql(String.format(SQL,
"T_BEST_EFFORT"));
+ // Both predicates accepted (for storage-side pruning) AND both also
retained as a runtime
+ // Calc so the source's pruning need only be best-effort.
+ assertThat(explain).contains("metadataFilter=[and(>(m0, 0), >(m1,
0))]");
+ assertThat(explain).contains("where=[AND(>(m0, 0), >(m1, 0))]");
+ }
+
+ //
-----------------------------------------------------------------------------------------
+ // Helpers
+ //
-----------------------------------------------------------------------------------------
+
+ private void registerTable(
+ String name,
+ Function<List<ResolvedExpression>, MetadataFilterResult> splitter,
+ Predicate<Row> sourceSideFilter) {
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(SCHEMA)
+ .source(new ConfigurableMetadataSource(splitter,
sourceSideFilter))
+ .build();
+ tableEnv.createTable(name, descriptor);
+ }
+
+ private List<Integer> collectIds(String sql) {
+ TableResult result = tableEnv.executeSql(sql);
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ List<Row> rows = CollectionUtil.iteratorToList(iterator);
+ return rows.stream().map(r -> (Integer)
r.getField(0)).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ //
-----------------------------------------------------------------------------------------
+ // Test source
+ //
-----------------------------------------------------------------------------------------
+
+ /**
+ * Bounded harness source. Supports {@link SupportsReadingMetadata} with
two metadata columns
+ * ({@code m0}, {@code m1}). The {@code MetadataFilterResult} returned
from {@link
+ * #applyMetadataFilters(List)} is controlled by the supplied splitter
strategy so each test can
+ * exercise a different shape.
+ *
+ * <p>The runtime emits all rows; correctness for {@code remaining}
predicates depends on the
+ * Calc above the scan to drop non-matching rows.
+ */
+ static class ConfigurableMetadataSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ static final Function<List<ResolvedExpression>, MetadataFilterResult>
ACCEPT_ALL =
+ in -> MetadataFilterResult.of(in, Collections.emptyList());
+
+ static final Function<List<ResolvedExpression>, MetadataFilterResult>
ACCEPT_NONE =
+ in -> MetadataFilterResult.of(Collections.emptyList(), in);
+
+ static final Function<List<ResolvedExpression>, MetadataFilterResult>
ACCEPT_FIRST_ONLY =
+ in ->
+ MetadataFilterResult.of(
+ Collections.singletonList(in.get(0)),
+ Collections.singletonList(in.get(1)));
+
+ static final Function<List<ResolvedExpression>, MetadataFilterResult>
+ BEST_EFFORT_BOTH_AND_REMAIN = in ->
MetadataFilterResult.of(in, in);
+
+ private final Function<List<ResolvedExpression>, MetadataFilterResult>
splitter;
+ private final Predicate<Row> sourceSideFilter;
+
+ private DataType producedDataType;
+
+ ConfigurableMetadataSource(
+ Function<List<ResolvedExpression>, MetadataFilterResult>
splitter,
+ Predicate<Row> sourceSideFilter) {
+ super(true);
+ this.splitter = splitter;
+ this.sourceSideFilter = sourceSideFilter;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new LinkedHashMap<>();
+ metadata.put("m0", INT());
+ metadata.put("m1", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ return splitter.apply(metadataFilters);
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ DataType emitted =
+ producedDataType != null
+ ? producedDataType
+ : getFactoryContext().getPhysicalRowDataType();
+ DynamicTableSource.DataStructureConverter converter =
+
runtimeProviderContext.createDataStructureConverter(emitted);
+ // Apply the source-side filter once; the generator just hands out
rows by index.
+ List<Row> emittedRows =
+
ROWS.stream().filter(sourceSideFilter).collect(Collectors.toList());
+ GeneratorFunction<Long, RowData> generator =
+ index -> (RowData)
converter.toInternal(emittedRows.get(index.intValue()));
+ DataGeneratorSource<RowData> source =
+ new DataGeneratorSource<>(
+ generator, emittedRows.size(),
TypeInformation.of(RowData.class));
+ return SourceProvider.of(source);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
index cef00b21d69..899a8640cf9 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
@@ -33,6 +33,27 @@ LogicalProject(id=[$0])
LogicalProject(id=[$0])
+- LogicalProject(id=[$0], event_ts=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T3,
metadata=[timestamp], metadataFilter=[>(timestamp, 2024-01-01 00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testBestEffortMetadataPruning">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T8 WHERE event_time > TIMESTAMP '2024-01-01
00:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T8,
metadata=[event_time]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T8,
metadata=[event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
]]>
</Resource>
</TestCase>
@@ -114,6 +135,27 @@ LogicalProject(id=[$0])
LogicalProject(id=[$0])
+- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T4,
metadata=[event_time], filter=[>(id, 10)], metadataFilter=[>(event_time,
2024-01-01 00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNonContiguousSubsetAcceptance">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T9 WHERE m0 > 0 AND m1 > 1 AND m2 > 2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[AND(>($1, 0), >($2, 1), >($3, 2))])
+ +- LogicalProject(id=[$0], m0=[$1], m1=[$2], m2=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T9,
metadata=[m0, m1, m2]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], m0=[$1], m1=[$2], m2=[$3])
+ +- LogicalFilter(condition=[>($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T9,
metadata=[m0, m1, m2], metadataFilter=[and(>(m0, 0), >(m2, 2))]]])
]]>
</Resource>
</TestCase>
@@ -125,16 +167,16 @@ LogicalProject(id=[$0])
<![CDATA[
LogicalProject(id=[$0])
+- LogicalFilter(condition=[AND(>($1, 2024-01-01 00:00:00), >($2, 5))])
- +- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, T6,
metadata=[priority, event_time]]])
+ +- LogicalProject(id=[$0], event_time=[$1], priority=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6,
metadata=[event_time, priority]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(id=[$0])
-+- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
- +- LogicalFilter(condition=[>($1, 5)])
- +- LogicalTableScan(table=[[default_catalog, default_database, T6,
metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01
00:00:00)]]])
++- LogicalProject(id=[$0], event_time=[$1], priority=[$2])
+ +- LogicalFilter(condition=[>($2, 5)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6,
metadata=[event_time, priority], metadataFilter=[>(event_time, 2024-01-01
00:00:00)]]])
]]>
</Resource>
</TestCase>