This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 92e67cfc79f [FLINK-39421][table] Metadata filter push-down for table
sources
92e67cfc79f is described below
commit 92e67cfc79f79b76e5fa67dc7596554175b57e7e
Author: Jim Hughes <[email protected]>
AuthorDate: Fri Apr 24 09:47:39 2026 -0400
[FLINK-39421][table] Metadata filter push-down for table sources
This closes #27913.
---
.../source/abilities/SupportsReadingMetadata.java | 69 ++++
.../plan/abilities/source/FilterPushDownSpec.java | 100 +++--
.../source/MetadataFilterPushDownSpec.java | 159 ++++++++
.../plan/abilities/source/SourceAbilitySpec.java | 1 +
.../logical/PushFilterIntoSourceScanRuleBase.java | 189 ++++++++-
.../logical/PushFilterIntoTableSourceScanRule.java | 92 ++++-
.../planner/factories/TestValuesTableFactory.java | 146 ++++---
.../serde/DynamicTableSourceSpecSerdeTest.java | 68 +++-
.../MetadataFilterInReadingMetadataTest.java | 452 +++++++++++++++++++++
.../MetadataFilterInReadingMetadataTest.xml | 161 ++++++++
10 files changed, 1300 insertions(+), 137 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index 60b6932b617..0d1c65f0f2f 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -25,11 +25,13 @@ import
org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
default boolean supportsMetadataProjection() {
return true;
}
+
+ /**
+ * Whether this source supports filtering on metadata columns.
+ *
+ * <p>When this method returns {@code true}, the planner may call {@link
+ * #applyMetadataFilters(List)} during optimization with predicates
expressed in metadata key
+ * names (from {@link #listReadableMetadata()}), not SQL column aliases.
Sources that do not
+ * override this method will not receive metadata filter predicates.
+ *
+ * <p>This is independent of {@link SupportsFilterPushDown}, which handles
physical column
+ * predicates. A source can implement both to accept filters on physical
and metadata columns.
+ */
+ default boolean supportsMetadataFilterPushDown() {
+ return false;
+ }
+
+ /**
+ * Provides a list of metadata filters in conjunctive form. A source can
pick filters and return
+ * the accepted and remaining filters. Same contract as {@link
+ * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
+ *
+ * <p>The provided filters reference metadata key names (from {@link
#listReadableMetadata()}),
+ * not SQL column aliases. For example, a column declared as {@code
msg_offset BIGINT METADATA
+ * FROM 'offset'} will have its predicate expressed as {@code offset >=
1000}, not {@code
+ * msg_offset >= 1000}. The planner handles the alias-to-key translation
before calling this
+ * method.
+ */
+ default MetadataFilterResult applyMetadataFilters(List<ResolvedExpression>
metadataFilters) {
+ return MetadataFilterResult.of(Collections.emptyList(),
metadataFilters);
+ }
+
+ /**
+ * Result of a metadata filter push down. Communicates the source's
response to the planner
+ * during optimization.
+ */
+ @PublicEvolving
+ final class MetadataFilterResult {
+ private final List<ResolvedExpression> acceptedFilters;
+ private final List<ResolvedExpression> remainingFilters;
+
+ private MetadataFilterResult(
+ List<ResolvedExpression> acceptedFilters,
+ List<ResolvedExpression> remainingFilters) {
+ this.acceptedFilters = acceptedFilters;
+ this.remainingFilters = remainingFilters;
+ }
+
+ /**
+ * Constructs a metadata filter push-down result.
+ *
+ * @param acceptedFilters filters consumed by the source (best effort)
+ * @param remainingFilters filters that a subsequent operation must
still apply at runtime
+ */
+ public static MetadataFilterResult of(
+ List<ResolvedExpression> acceptedFilters,
+ List<ResolvedExpression> remainingFilters) {
+ return new MetadataFilterResult(acceptedFilters, remainingFilters);
+ }
+
+ public List<ResolvedExpression> getAcceptedFilters() {
+ return acceptedFilters;
+ }
+
+ public List<ResolvedExpression> getRemainingFilters() {
+ return remainingFilters;
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index d30fb1a4181..7a4e1481e04 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -99,50 +99,9 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
DynamicTableSource tableSource,
SourceAbilityContext context) {
if (tableSource instanceof SupportsFilterPushDown) {
- RexNodeToExpressionConverter converter =
- new RexNodeToExpressionConverter(
- new RexBuilder(context.getTypeFactory()),
-
context.getSourceRowType().getFieldNames().toArray(new String[0]),
- context.getFunctionCatalog(),
- context.getCatalogManager(),
- Option.apply(
- context.getTypeFactory()
-
.buildRelNodeRowType(context.getSourceRowType())));
- List<Expression> filters =
- predicates.stream()
- .map(
- p -> {
- scala.Option<ResolvedExpression> expr
= p.accept(converter);
- if (expr.isDefined()) {
- return expr.get();
- } else {
- throw new TableException(
- String.format(
- "%s can not be
converted to Expression, please make sure %s can accept %s.",
- p.toString(),
-
tableSource.getClass().getSimpleName(),
- p.toString()));
- }
- })
- .collect(Collectors.toList());
- ExpressionResolver resolver =
- ExpressionResolver.resolverFor(
- context.getTableConfig(),
- context.getClassLoader(),
- name -> Optional.empty(),
- context.getFunctionCatalog()
- .asLookup(
- str -> {
- throw new
TableException(
- "We should not
need to lookup any expressions at this point");
- }),
-
context.getCatalogManager().getDataTypeFactory(),
- (sqlExpression, inputRowType, outputType)
-> {
- throw new TableException(
- "SQL expression parsing is not
supported at this location.");
- })
- .build();
- return ((SupportsFilterPushDown)
tableSource).applyFilters(resolver.resolve(filters));
+ List<ResolvedExpression> resolved =
+ resolvePredicates(predicates, context.getSourceRowType(),
tableSource, context);
+ return ((SupportsFilterPushDown)
tableSource).applyFilters(resolved);
} else {
throw new TableException(
String.format(
@@ -151,6 +110,59 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
}
}
+ /**
+ * Converts {@link RexNode} predicates to {@link ResolvedExpression}s
using the given row type.
+ * Shared between physical and metadata filter push-down paths.
+ */
+ static List<ResolvedExpression> resolvePredicates(
+ List<RexNode> predicates,
+ RowType rowType,
+ DynamicTableSource tableSource,
+ SourceAbilityContext context) {
+ RexNodeToExpressionConverter converter =
+ new RexNodeToExpressionConverter(
+ new RexBuilder(context.getTypeFactory()),
+ rowType.getFieldNames().toArray(new String[0]),
+ context.getFunctionCatalog(),
+ context.getCatalogManager(),
+
Option.apply(context.getTypeFactory().buildRelNodeRowType(rowType)));
+ List<Expression> filters =
+ predicates.stream()
+ .map(
+ p -> {
+ scala.Option<ResolvedExpression> expr =
p.accept(converter);
+ if (expr.isDefined()) {
+ return expr.get();
+ } else {
+ throw new TableException(
+ String.format(
+ "%s can not be
converted to Expression, please make sure %s can accept %s.",
+ p.toString(),
+
tableSource.getClass().getSimpleName(),
+ p.toString()));
+ }
+ })
+ .collect(Collectors.toList());
+ ExpressionResolver resolver =
+ ExpressionResolver.resolverFor(
+ context.getTableConfig(),
+ context.getClassLoader(),
+ name -> Optional.empty(),
+ context.getFunctionCatalog()
+ .asLookup(
+ str -> {
+ throw new TableException(
+ "We should not
need to lookup any expressions at this point");
+ }),
+
context.getCatalogManager().getDataTypeFactory(),
+ (sqlExpression, inputRowType, outputType) -> {
+ throw new TableException(
+ "SQL expression parsing is not
supported at this location.");
+ })
+ .build();
+ return resolver.resolve(filters);
+ }
+
@Override
public boolean needAdjustFieldReferenceAfterProjection() {
return true;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
new file mode 100644
index 00000000000..090ab45fe27
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializes metadata filter predicates and replays them during compiled plan
restoration.
+ *
+ * <p>Predicates are stored with a {@code predicateRowType} that already uses
metadata key names
+ * (not SQL aliases). The alias-to-key translation happens once at
optimization time, so no
+ * column-to-key mapping needs to be persisted.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("MetadataFilterPushDown")
+public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
+
+ public static final String FIELD_NAME_PREDICATES = "predicates";
+ public static final String FIELD_NAME_PREDICATE_ROW_TYPE =
"predicateRowType";
+
+ @JsonProperty(FIELD_NAME_PREDICATES)
+ private final List<RexNode> predicates;
+
+ /**
+ * Row type snapshot using metadata key names. Stored because
ProjectPushDownSpec may narrow the
+ * context's row type during restore.
+ */
+ @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
+ private final RowType predicateRowType;
+
+ @JsonCreator
+ public MetadataFilterPushDownSpec(
+ @JsonProperty(FIELD_NAME_PREDICATES) List<RexNode> predicates,
+ @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType
predicateRowType) {
+ this.predicates = new ArrayList<>(checkNotNull(predicates));
+ this.predicateRowType = checkNotNull(predicateRowType);
+ }
+
+ public List<RexNode> getPredicates() {
+ return predicates;
+ }
+
+ @Override
+ public void apply(DynamicTableSource tableSource, SourceAbilityContext
context) {
+ // Use stored predicateRowType; context's row type may be narrowed by
ProjectPushDownSpec.
+ MetadataFilterResult result =
+ applyMetadataFilters(predicates, predicateRowType,
tableSource, context);
+ if (result.getAcceptedFilters().size() != predicates.size()) {
+ throw new TableException("All metadata predicates should be
accepted here.");
+ }
+ }
+
+ /**
+ * Converts RexNode predicates to ResolvedExpressions using the given row
type and calls
+ * applyMetadataFilters on the source. The row type must already use
metadata key names.
+ */
+ public static MetadataFilterResult applyMetadataFilters(
+ List<RexNode> predicates,
+ RowType metadataKeyRowType,
+ DynamicTableSource tableSource,
+ SourceAbilityContext context) {
+ if (!(tableSource instanceof SupportsReadingMetadata)) {
+ throw new TableException(
+ String.format(
+ "%s does not support SupportsReadingMetadata.",
+ tableSource.getClass().getName()));
+ }
+ SupportsReadingMetadata readingMetadata = (SupportsReadingMetadata)
tableSource;
+ if (!readingMetadata.supportsMetadataFilterPushDown()) {
+ throw new TableException(
+ String.format(
+ "%s no longer supports metadata filter push-down.",
+ tableSource.getClass().getName()));
+ }
+ List<ResolvedExpression> resolved =
+ FilterPushDownSpec.resolvePredicates(
+ predicates, metadataKeyRowType, tableSource, context);
+ return readingMetadata.applyMetadataFilters(resolved);
+ }
+
+ @Override
+ public boolean needAdjustFieldReferenceAfterProjection() {
+ return true;
+ }
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ final List<String> expressionStrs = new ArrayList<>();
+ for (RexNode rexNode : predicates) {
+ expressionStrs.add(
+ FlinkRexUtil.getExpressionString(
+ rexNode,
+
JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
+ }
+
+ return String.format(
+ "metadataFilter=[%s]",
+ expressionStrs.stream()
+ .reduce((l, r) -> String.format("and(%s, %s)", l, r))
+ .orElse(""));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
+ return Objects.equals(predicates, that.predicates)
+ && Objects.equals(predicateRowType, that.predicateRowType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), predicates, predicateRowType);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
index e51328d5e9f..20e594304a8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
@@ -38,6 +38,7 @@ import java.util.Optional;
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
+ @JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 76e5a9b4a10..eb5521ef782 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -17,23 +17,36 @@
package org.apache.flink.table.planner.plan.rules.logical;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.expressions.ResolvedExpression;
import
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import
org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
import
org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -60,15 +73,8 @@ public abstract class PushFilterIntoSourceScanRuleBase
extends RelOptRule {
}
/**
- * Resolves filters using the underlying sources {@link
SupportsFilterPushDown} and creates a
- * new {@link TableSourceTable} with the supplied predicates.
- *
- * @param convertiblePredicates Predicates to resolve
- * @param oldTableSourceTable TableSourceTable to copy
- * @param scan Underlying table scan to push to
- * @param relBuilder Builder to push the scan to
- * @return A tuple, constituting of the resolved filters and the newly
created {@link
- * TableSourceTable}
+ * Resolves filters via {@link SupportsFilterPushDown} and creates a new
{@link
+ * TableSourceTable}.
*/
protected Tuple2<SupportsFilterPushDown.Result, TableSourceTable>
resolveFiltersAndCreateTableSourceTable(
@@ -102,17 +108,168 @@ public abstract class PushFilterIntoSourceScanRuleBase
extends RelOptRule {
return new Tuple2<>(result, newTableSourceTable);
}
- /**
- * Determines wether we can pushdown the filter into the source. we can
not push filter twice,
- * make sure FilterPushDownSpec has not been assigned as a capability.
- *
- * @param tableSourceTable Table scan to attempt to push into
- * @return Whether we can push or not
- */
+ /** Whether filter push-down is possible and not already assigned. */
protected boolean canPushdownFilter(TableSourceTable tableSourceTable) {
return tableSourceTable != null
&& tableSourceTable.tableSource() instanceof
SupportsFilterPushDown
&& Arrays.stream(tableSourceTable.abilitySpecs())
.noneMatch(spec -> spec instanceof FilterPushDownSpec);
}
+
+ /** Whether metadata filter push-down is possible and not already
assigned. */
+ protected boolean canPushdownMetadataFilter(TableSourceTable
tableSourceTable) {
+ if (tableSourceTable == null) {
+ return false;
+ }
+ DynamicTableSource source = tableSourceTable.tableSource();
+ if (!(source instanceof SupportsReadingMetadata)) {
+ return false;
+ }
+ if (!((SupportsReadingMetadata)
source).supportsMetadataFilterPushDown()) {
+ return false;
+ }
+ return Arrays.stream(tableSourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof MetadataFilterPushDownSpec);
+ }
+
+ /**
+ * True if predicate references metadata columns exclusively (no physical
columns).
+ *
+ * <p>A predicate like {@code OR(physical_pred, metadata_pred)} returns
false because it
+ * references both physical and metadata columns. Mixed predicates remain
as runtime filters.
+ */
+ protected boolean referencesOnlyMetadataColumns(RexNode predicate, int
physicalColumnCount) {
+ boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata
+ predicate.accept(
+ new RexVisitorImpl<Void>(true) {
+ @Override
+ public Void visitInputRef(RexInputRef inputRef) {
+ if (inputRef.getIndex() >= physicalColumnCount) {
+ saw[1] = true;
+ } else {
+ saw[0] = true;
+ }
+ return null;
+ }
+ });
+ return saw[1] && !saw[0];
+ }
+
+ /** Number of physical columns in the scan's schema. */
+ protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) {
+ ResolvedSchema schema =
tableSourceTable.contextResolvedTable().getResolvedSchema();
+ return (int)
schema.getColumns().stream().filter(Column::isPhysical).count();
+ }
+
+ /** Maps SQL column names to metadata keys for metadata columns. */
+ protected Map<String, String> buildColumnToMetadataKeyMap(TableSourceTable
tableSourceTable) {
+ ResolvedSchema schema =
tableSourceTable.contextResolvedTable().getResolvedSchema();
+ Map<String, String> mapping = new HashMap<>();
+ for (Column col : schema.getColumns()) {
+ if (col instanceof Column.MetadataColumn) {
+ Column.MetadataColumn metaCol = (Column.MetadataColumn) col;
+ String sqlName = metaCol.getName();
+ String metadataKey = metaCol.getMetadataKey().orElse(sqlName);
+ mapping.put(sqlName, metadataKey);
+ }
+ }
+ return mapping;
+ }
+
+ /** Resolves metadata filters and creates a new {@link TableSourceTable}.
*/
+ protected Tuple2<MetadataFilterResult, TableSourceTable>
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ RexNode[] metadataPredicates,
+ TableSourceTable oldTableSourceTable,
+ TableScan scan,
+ RelBuilder relBuilder) {
+ DynamicTableSource newTableSource =
oldTableSourceTable.tableSource().copy();
+ SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);
+
+ // Build a metadata-only row type (field names are metadata keys, not
SQL aliases) and
+ // an old->new index mapping. Storing only metadata columns avoids
name collisions with
+ // physical columns (e.g. `offset INT, msg_offset INT METADATA FROM
'offset'`).
+ MetadataRowInfo metadataRowInfo =
+ buildMetadataRowInfo(oldTableSourceTable,
abilityContext.getSourceRowType());
+ RexNode[] remappedPredicates =
+ remapPredicates(metadataPredicates,
metadataRowInfo.oldIndexToNewIndex);
+
+ MetadataFilterResult result =
+ MetadataFilterPushDownSpec.applyMetadataFilters(
+ Arrays.asList(remappedPredicates),
+ metadataRowInfo.metadataRowType,
+ newTableSource,
+ abilityContext);
+
+ int acceptedCount = result.getAcceptedFilters().size();
+ List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
+ for (int i = 0; i < acceptedCount; i++) {
+ acceptedRemappedPredicates.add(remappedPredicates[i]);
+ }
+ MetadataFilterPushDownSpec metadataSpec =
+ new MetadataFilterPushDownSpec(
+ acceptedRemappedPredicates,
metadataRowInfo.metadataRowType);
+
+ TableSourceTable newTableSourceTable =
+ oldTableSourceTable.copy(
+ newTableSource,
+ oldTableSourceTable.getStatistic(),
+ new SourceAbilitySpec[] {metadataSpec});
+
+ return new Tuple2<>(result, newTableSourceTable);
+ }
+
+ /**
+ * Builds a {@link RowType} containing only metadata columns (named by
metadata key) together
+ * with an old-index-to-new-index mapping from the scan's source row type.
Non-metadata
+ * positions are absent from the mapping.
+ */
+ private MetadataRowInfo buildMetadataRowInfo(
+ TableSourceTable tableSourceTable, RowType sourceRowType) {
+ Map<String, String> columnToMetadataKey =
buildColumnToMetadataKeyMap(tableSourceTable);
+ List<RowField> metadataFields = new ArrayList<>();
+ Map<Integer, Integer> oldToNewIndex = new HashMap<>();
+ for (int i = 0; i < sourceRowType.getFieldCount(); i++) {
+ String sqlName = sourceRowType.getFieldNames().get(i);
+ String metadataKey = columnToMetadataKey.get(sqlName);
+ if (metadataKey != null) {
+ oldToNewIndex.put(i, metadataFields.size());
+ metadataFields.add(new RowField(metadataKey,
sourceRowType.getTypeAt(i)));
+ }
+ }
+ return new MetadataRowInfo(new RowType(false, metadataFields),
oldToNewIndex);
+ }
+
+ /**
+ * Rewrites {@link RexInputRef}s in each predicate using the supplied
old->new index map. Throws
+ * if a predicate references an index not in the map (would indicate a
non-metadata reference
+ * slipped past {@link #referencesOnlyMetadataColumns}).
+ */
+ private RexNode[] remapPredicates(
+ RexNode[] predicates, Map<Integer, Integer> oldIndexToNewIndex) {
+ RexShuttle shuttle =
+ new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ Integer newIdx =
oldIndexToNewIndex.get(inputRef.getIndex());
+ if (newIdx == null) {
+ throw new IllegalStateException(
+ "Metadata predicate references
non-metadata column index "
+ + inputRef.getIndex());
+ }
+ return new RexInputRef(newIdx, inputRef.getType());
+ }
+ };
+ return Arrays.stream(predicates).map(p ->
p.accept(shuttle)).toArray(RexNode[]::new);
+ }
+
+ private static final class MetadataRowInfo {
+ final RowType metadataRowType;
+ final Map<Integer, Integer> oldIndexToNewIndex;
+
+ MetadataRowInfo(RowType metadataRowType, Map<Integer, Integer>
oldIndexToNewIndex) {
+ this.metadataRowType = metadataRowType;
+ this.oldIndexToNewIndex = oldIndexToNewIndex;
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index eaa6999cdd7..57d80b1b2f9 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -19,7 +19,8 @@
package org.apache.flink.table.planner.plan.rules.logical;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
@@ -29,11 +30,15 @@ import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
+import java.util.List;
+
import scala.Tuple2;
/**
- * Planner rule that tries to push a filter into a {@link LogicalTableScan},
which table is a {@link
- * TableSourceTable}. And the table source in the table is a {@link
SupportsFilterPushDown}.
+ * Pushes filters from a {@link Filter} into a {@link LogicalTableScan}.
Physical filters use {@link
+ * SupportsFilterPushDown}; metadata filters use {@link
+ * SupportsReadingMetadata#applyMetadataFilters}.
*/
public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanRuleBase {
public static final PushFilterIntoTableSourceScanRule INSTANCE =
@@ -59,7 +64,7 @@ public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanR
LogicalTableScan scan = call.rel(1);
TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- return canPushdownFilter(tableSourceTable);
+ return canPushdownFilter(tableSourceTable) ||
canPushdownMetadataFilter(tableSourceTable);
}
@Override
@@ -74,7 +79,7 @@ public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanR
RelOptRuleCall call,
Filter filter,
LogicalTableScan scan,
- FlinkPreparingTableBase relOptTable) {
+ TableSourceTable tableSourceTable) {
RelBuilder relBuilder = call.builder();
Tuple2<RexNode[], RexNode[]> extractedPredicates =
@@ -87,28 +92,77 @@ public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanR
RexNode[] convertiblePredicates = extractedPredicates._1;
RexNode[] unconvertedPredicates = extractedPredicates._2;
if (convertiblePredicates.length == 0) {
- // no condition can be translated to expression
return;
}
- Tuple2<SupportsFilterPushDown.Result, TableSourceTable>
scanAfterPushdownWithResult =
- resolveFiltersAndCreateTableSourceTable(
- convertiblePredicates,
- relOptTable.unwrap(TableSourceTable.class),
- scan,
- relBuilder);
+ boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
+ boolean supportsMetadataFilter =
canPushdownMetadataFilter(tableSourceTable);
+ int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
+
+ // Classify predicates: only separate metadata predicates when the
source
+ // actually supports metadata filter push-down. Otherwise, all
predicates
+ // go through the physical path to preserve the FilterPushDownSpec
guard
+ // that prevents rule re-firing and maintains scan reuse invariants.
+ List<RexNode> physicalPredicates = new ArrayList<>();
+ List<RexNode> metadataPredicates = new ArrayList<>();
+ for (RexNode predicate : convertiblePredicates) {
+ if (supportsMetadataFilter
+ && referencesOnlyMetadataColumns(predicate,
physicalColumnCount)) {
+ metadataPredicates.add(predicate);
+ } else {
+ physicalPredicates.add(predicate);
+ }
+ }
+
+ List<RexNode> allRemainingRexNodes = new ArrayList<>();
+ TableSourceTable currentTable = tableSourceTable;
+
+ if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
+ Tuple2<SupportsFilterPushDown.Result, TableSourceTable>
physicalResult =
+ resolveFiltersAndCreateTableSourceTable(
+ physicalPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = physicalResult._2;
+ List<RexNode> physicalRemaining =
+
convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
+ allRemainingRexNodes.addAll(physicalRemaining);
+ } else {
+ allRemainingRexNodes.addAll(physicalPredicates);
+ }
- SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1;
- TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2;
+ if (!metadataPredicates.isEmpty()) {
+ Tuple2<MetadataFilterResult, TableSourceTable> metadataResult =
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ metadataPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = metadataResult._2;
+ // Remaining (rejected) metadata predicates stay as a
LogicalFilter above
+ // the scan so they are still evaluated at runtime. We use the
original
+ // RexNodes (suffix) because the remaining ResolvedExpressions use
metadata
+ // key names, not SQL aliases needed by the Filter's row type. The
+ // validation in resolveMetadataFiltersAndCreateTableSourceTable
ensures
+ // the partition invariant (accepted prefix + remaining suffix =
input).
+ int acceptedCount = metadataResult._1.getAcceptedFilters().size();
+ for (int i = acceptedCount; i < metadataPredicates.size(); i++) {
+ allRemainingRexNodes.add(metadataPredicates.get(i));
+ }
+ }
+
+ for (RexNode unconverted : unconvertedPredicates) {
+ allRemainingRexNodes.add(unconverted);
+ }
LogicalTableScan newScan =
- LogicalTableScan.create(scan.getCluster(), tableSourceTable,
scan.getHints());
- if (result.getRemainingFilters().isEmpty() &&
unconvertedPredicates.length == 0) {
+ LogicalTableScan.create(scan.getCluster(), currentTable,
scan.getHints());
+
+ if (allRemainingRexNodes.isEmpty()) {
call.transformTo(newScan);
} else {
- RexNode remainingCondition =
- createRemainingCondition(
- relBuilder, result.getRemainingFilters(),
unconvertedPredicates);
+ RexNode remainingCondition = relBuilder.and(allRemainingRexNodes);
RexNode simplifiedRemainingCondition =
FlinkRexUtil.simplify(
relBuilder.getRexBuilder(),
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 1e6b540d4b1..d011687d82e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -440,6 +440,9 @@ public final class TestValuesTableFactory
private static final ConfigOption<Boolean> ENABLE_WATERMARK_PUSH_DOWN =
ConfigOptions.key("enable-watermark-push-down").booleanType().defaultValue(false);
+ private static final ConfigOption<Boolean>
ENABLE_METADATA_FILTER_PUSH_DOWN =
+
ConfigOptions.key("enable-metadata-filter-push-down").booleanType().defaultValue(false);
+
private static final ConfigOption<Boolean> ENABLE_CUSTOM_SHUFFLE =
ConfigOptions.key("enable-custom-shuffle").booleanType().defaultValue(false);
@@ -574,6 +577,8 @@ public final class TestValuesTableFactory
boolean enableAggregatePushDown =
helper.getOptions().get(ENABLE_AGGREGATE_PUSH_DOWN);
boolean nestedProjectionSupported =
helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
boolean enableWatermarkPushDown =
helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN);
+ boolean enableMetadataFilterPushDown =
+ helper.getOptions().get(ENABLE_METADATA_FILTER_PUSH_DOWN);
boolean failingSource = helper.getOptions().get(FAILING_SOURCE);
int numElementToSkip =
helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
boolean internalData = helper.getOptions().get(INTERNAL_DATA);
@@ -692,45 +697,51 @@ public final class TestValuesTableFactory
if (disableLookup) {
if (enableWatermarkPushDown) {
- return new TestValuesScanTableSourceWithWatermarkPushDown(
- producedDataType,
- changelogMode,
- terminating,
- runtimeSource,
- failingSource,
- partition2Rows,
- context.getObjectIdentifier().getObjectName(),
- nestedProjectionSupported,
- null,
- Collections.emptyList(),
- filterableFieldsSet,
- dynamicFilteringFieldsSet,
- numElementToSkip,
- Long.MAX_VALUE,
- partitions,
- readableMetadata,
- null,
- enableAggregatePushDown);
+ TestValuesScanTableSourceWithWatermarkPushDown source =
+ new TestValuesScanTableSourceWithWatermarkPushDown(
+ producedDataType,
+ changelogMode,
+ terminating,
+ runtimeSource,
+ failingSource,
+ partition2Rows,
+
context.getObjectIdentifier().getObjectName(),
+ nestedProjectionSupported,
+ null,
+ Collections.emptyList(),
+ filterableFieldsSet,
+ dynamicFilteringFieldsSet,
+ numElementToSkip,
+ Long.MAX_VALUE,
+ partitions,
+ readableMetadata,
+ null,
+ enableAggregatePushDown);
+
source.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+ return source;
} else {
- return new TestValuesScanTableSource(
- producedDataType,
- changelogMode,
- boundedness,
- terminating,
- runtimeSource,
- failingSource,
- partition2Rows,
- nestedProjectionSupported,
- null,
- Collections.emptyList(),
- filterableFieldsSet,
- dynamicFilteringFieldsSet,
- numElementToSkip,
- Long.MAX_VALUE,
- partitions,
- readableMetadata,
- null,
- enableAggregatePushDown);
+ TestValuesScanTableSource source =
+ new TestValuesScanTableSource(
+ producedDataType,
+ changelogMode,
+ boundedness,
+ terminating,
+ runtimeSource,
+ failingSource,
+ partition2Rows,
+ nestedProjectionSupported,
+ null,
+ Collections.emptyList(),
+ filterableFieldsSet,
+ dynamicFilteringFieldsSet,
+ numElementToSkip,
+ Long.MAX_VALUE,
+ partitions,
+ readableMetadata,
+ null,
+ enableAggregatePushDown);
+
source.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+ return source;
}
} else {
Collection<Row> consumedData =
@@ -904,6 +915,7 @@ public final class TestValuesTableFactory
SINK_CHANGELOG_MODE_ENFORCED,
WRITABLE_METADATA,
ENABLE_WATERMARK_PUSH_DOWN,
+ ENABLE_METADATA_FILTER_PUSH_DOWN,
SINK_DROP_LATE_EVENT,
SINK_BUCKET_COUNT_REQUIRED,
SINK_SUPPORTS_DELETE_BY_KEY,
@@ -1088,6 +1100,7 @@ public final class TestValuesTableFactory
protected final Map<String, DataType> readableMetadata;
protected @Nullable int[] projectedMetadataFields;
protected final boolean enableAggregatePushDown;
+ protected boolean enableMetadataFilterPushDown;
private @Nullable int[] groupingSet;
private List<AggregateExpression> aggregateExpressions;
@@ -1619,6 +1632,22 @@ public final class TestValuesTableFactory
remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
}
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return enableMetadataFilterPushDown;
+ }
+
+ @Override
+ public SupportsReadingMetadata.MetadataFilterResult
applyMetadataFilters(
+ List<ResolvedExpression> metadataFilters) {
+ return SupportsReadingMetadata.MetadataFilterResult.of(
+ metadataFilters, Collections.emptyList());
+ }
+
+ void setEnableMetadataFilterPushDown(boolean enable) {
+ this.enableMetadataFilterPushDown = enable;
+ }
+
@Override
public List<String> listAcceptedFilterFields() {
return new ArrayList<>(dynamicFilteringFields);
@@ -1678,25 +1707,28 @@ public final class TestValuesTableFactory
@Override
public DynamicTableSource copy() {
- return new TestValuesScanTableSource(
- producedDataType,
- changelogMode,
- boundedness,
- terminating,
- runtimeSource,
- failingSource,
- data,
- nestedProjectionSupported,
- projectedPhysicalFields,
- filterPredicates,
- filterableFields,
- dynamicFilteringFields,
- numElementToSkip,
- limit,
- allPartitions,
- readableMetadata,
- projectedMetadataFields,
- enableAggregatePushDown);
+ TestValuesScanTableSource copy =
+ new TestValuesScanTableSource(
+ producedDataType,
+ changelogMode,
+ boundedness,
+ terminating,
+ runtimeSource,
+ failingSource,
+ data,
+ nestedProjectionSupported,
+ projectedPhysicalFields,
+ filterPredicates,
+ filterableFields,
+ dynamicFilteringFields,
+ numElementToSkip,
+ limit,
+ allPartitions,
+ readableMetadata,
+ projectedMetadataFields,
+ enableAggregatePushDown);
+ copy.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+ return copy;
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index 6fd5942f19e..478c91eedcd 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
+import
org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
import
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
import
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
import
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
@@ -64,6 +65,7 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.TimestampString;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.params.ParameterizedTest;
@@ -274,7 +276,71 @@ public class DynamicTableSourceSpecSerdeTest {
put("p", "B");
}
}))));
- return Stream.of(spec1, spec2);
+ Map<String, String> options3 = new HashMap<>();
+ options3.put("connector", TestValuesTableFactory.IDENTIFIER);
+ options3.put("disable-lookup", "true");
+ options3.put("enable-metadata-filter-push-down", "true");
+ options3.put("bounded", "false");
+ options3.put("readable-metadata", "timestamp:TIMESTAMP(3),
offset:BIGINT");
+
+ final ResolvedSchema resolvedSchema3 =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("id", DataTypes.INT()),
+ Column.metadata(
+ "rowtime", DataTypes.TIMESTAMP(3),
"timestamp", false),
+ Column.metadata("offset", DataTypes.BIGINT(),
null, false)),
+ Collections.emptyList(),
+ null,
+ Collections.emptyList(),
+ null);
+
+ final CatalogTable catalogTable3 =
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema3).build())
+ .options(options3)
+ .build();
+
+ // predicateRowType uses metadata key names (already translated from
SQL aliases).
+ RowType predicateRowType3 =
+ RowType.of(
+ new LogicalType[] {new TimestampType(3), new
BigIntType()},
+ new String[] {"timestamp", "offset"});
+
+ DynamicTableSourceSpec spec3 =
+ new DynamicTableSourceSpec(
+ ContextResolvedTable.temporary(
+ ObjectIdentifier.of(
+
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),
+
TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(),
+ "MyTableMetadata"),
+ new ResolvedCatalogTable(catalogTable3,
resolvedSchema3)),
+ Collections.singletonList(
+ new MetadataFilterPushDownSpec(
+ Arrays.asList(
+ // timestamp > '2024-01-01'
+ rexBuilder.makeCall(
+
SqlStdOperatorTable.GREATER_THAN,
+
rexBuilder.makeInputRef(
+
factory.createSqlType(
+
SqlTypeName.TIMESTAMP, 3),
+ 0),
+
rexBuilder.makeTimestampLiteral(
+ new
TimestampString(
+
"2024-01-01 00:00:00"),
+ 3)),
+ // offset >= 10
+ rexBuilder.makeCall(
+
SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+
rexBuilder.makeInputRef(
+
factory.createSqlType(
+
SqlTypeName.BIGINT),
+ 1),
+
rexBuilder.makeExactLiteral(
+ new
BigDecimal(10)))),
+ predicateRowType3)));
+
+ return Stream.of(spec1, spec2, spec3);
}
@ParameterizedTest
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
new file mode 100644
index 00000000000..ffd6aa0ea89
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
+import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import
org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for metadata filter push-down through {@link
SupportsReadingMetadata}. */
+class MetadataFilterInReadingMetadataTest extends TableTestBase {
+
+ @RegisterExtension
+ private final SharedObjectsExtension sharedObjects =
SharedObjectsExtension.create();
+
+ private BatchTableTestUtil util;
+
+ @BeforeEach
+ void setup() {
+ util = batchTestUtil(TableConfig.getDefault());
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig =
+ TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+ calciteConfig
+ .getBatchProgram()
+ .get()
+ .addLast(
+ "rules",
+
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+ .setHepRulesExecutionType(
+
HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(
+ RuleSets.ofList(
+
PushFilterIntoTableSourceScanRule.INSTANCE,
+
CoreRules.FILTER_PROJECT_TRANSPOSE))
+ .build());
+ }
+
+ @Test
+ void testMetadataFilterPushDown() {
+ SharedReference<List<ResolvedExpression>> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(true,
receivedFilters))
+ .build();
+ util.tableEnv().createTable("T1", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
+
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+ }
+
+ @Test
+ void testMetadataFilterNotPushedWhenNotSupported() {
+ SharedReference<List<ResolvedExpression>> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(false,
receivedFilters))
+ .build();
+ util.tableEnv().createTable("T2", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
+
+ // No metadata filters should have been pushed
+ assertThat(receivedFilters.get()).isEmpty();
+ }
+
+ @Test
+ void testAliasedMetadataColumnFilter() {
+ SharedReference<List<ResolvedExpression>> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(RenamedMetadataFilterSource.SCHEMA)
+ .source(new
RenamedMetadataFilterSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T3", descriptor);
+
+ // 'event_ts' is the SQL alias for metadata key 'timestamp'
+ util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP
'2024-01-01 00:00:00'");
+
+ // The source should receive the filter with metadata key 'timestamp',
not 'event_ts'.
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(timestamp, 2024-01-01T00:00)]");
+ }
+
+ @Test
+ void testMixedPhysicalAndMetadataFilters() {
+ SharedReference<List<ResolvedExpression>> metadataFilters =
+ sharedObjects.add(new ArrayList<>());
+ SharedReference<List<ResolvedExpression>> physicalFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MixedFilterSource.SCHEMA)
+ .source(new MixedFilterSource(metadataFilters,
physicalFilters))
+ .build();
+ util.tableEnv().createTable("T4", descriptor);
+
+ util.verifyRelPlan(
+ "SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP
'2024-01-01 00:00:00'");
+
+ // Verify routing: id > 10 → physical path, event_time > ... →
metadata path.
+
assertThat(physicalFilters.get().toString()).isEqualTo("[greaterThan(id, 10)]");
+ assertThat(metadataFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+ }
+
+ @Test
+ void testPartialMetadataFilterAcceptance() {
+ SharedReference<List<ResolvedExpression>> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(PartialMetadataFilterSource.SCHEMA)
+ .source(new
PartialMetadataFilterSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T6", descriptor);
+
+ // Two metadata filters: the source accepts only the first one
+ util.verifyRelPlan(
+ "SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01
00:00:00'"
+ + " AND priority > 5");
+
+ // Source receives both filters; the XML reference verifies only the
first is accepted
+ // (the second remains as a LogicalFilter above the scan).
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00),
greaterThan(priority, 5)]");
+ }
+
+ @Test
+ void testPhysicalAndMetadataNameCollision() {
+ // Physical column 'offset' shares a name with the metadata key
'offset'
+ // (aliased in SQL as 'msg_offset'). The predicate on the metadata
column
+ // must be pushed down using the metadata key, not confused with the
+ // physical column of the same name.
+ SharedReference<List<ResolvedExpression>> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(CollidingNameSource.SCHEMA)
+ .source(new CollidingNameSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T7", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T7 WHERE msg_offset > 5");
+
+ // Must reference the metadata key 'offset', NOT the SQL alias
'msg_offset'.
+
assertThat(receivedFilters.get().toString()).isEqualTo("[greaterThan(offset,
5)]");
+ }
+
+ @Test
+ void testMetadataFilterWithProjection() {
+ SharedReference<List<ResolvedExpression>> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(true,
receivedFilters))
+ .build();
+ util.tableEnv().createTable("T5", descriptor);
+
+ util.verifyRelPlan(
+ "SELECT id, name FROM T5 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00'");
+
+ // Projection push-down must not perturb the metadata filter.
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+ }
+
+ //
-----------------------------------------------------------------------------------------
+ // Test sources
+ //
-----------------------------------------------------------------------------------------
+
+ /** Supports metadata filter push-down. */
+ private static class MetadataFilterSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("name", STRING())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .build();
+
+ private final boolean supportsMetadataFilter;
+ private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
+
+ MetadataFilterSource(
+ boolean supportsMetadataFilter,
+ SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
+ this.supportsMetadataFilter = supportsMetadataFilter;
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("event_time",
org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return supportsMetadataFilter;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
+ }
+ }
+
+ /** Tests key translation when SQL alias differs from metadata key. */
+ private static class RenamedMetadataFilterSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("event_ts", TIMESTAMP(3),
"timestamp")
+ .build();
+
+ private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
+
+ RenamedMetadataFilterSource(
+ SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("timestamp",
org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
+ }
+ }
+
+ /** Accepts only the first metadata filter; rejected filters remain in
plan. */
+ private static class PartialMetadataFilterSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .columnByMetadata("priority", INT())
+ .build();
+
+ private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
+
+ PartialMetadataFilterSource(
+ SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("event_time", TIMESTAMP(3));
+ metadata.put("priority", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ // Accept only the first filter
+ List<ResolvedExpression> accepted =
+ metadataFilters.isEmpty()
+ ? Collections.emptyList()
+ :
Collections.singletonList(metadataFilters.get(0));
+ List<ResolvedExpression> remaining =
+ metadataFilters.size() > 1
+ ? metadataFilters.subList(1,
metadataFilters.size())
+ : Collections.emptyList();
+ return MetadataFilterResult.of(accepted, remaining);
+ }
+ }
+
+ /** Tests mixed physical and metadata filter push-down. */
+ private static class MixedFilterSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata, SupportsFilterPushDown {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("name", STRING())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .build();
+
+ private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
+ private final SharedReference<List<ResolvedExpression>>
receivedPhysicalFilters;
+
+ MixedFilterSource(
+ SharedReference<List<ResolvedExpression>>
receivedMetadataFilters,
+ SharedReference<List<ResolvedExpression>>
receivedPhysicalFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ this.receivedPhysicalFilters = receivedPhysicalFilters;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("event_time",
org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
+ }
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ receivedPhysicalFilters.get().addAll(filters);
+ return Result.of(filters, Collections.emptyList());
+ }
+ }
+
+ /**
+ * Physical column {@code offset} shares a name with the metadata key
{@code offset} (SQL alias
+ * {@code msg_offset}). Exercises the physical-vs-metadata name collision
case.
+ */
+ private static class CollidingNameSource extends
TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("offset", INT())
+ .columnByMetadata("msg_offset", INT(), "offset")
+ .build();
+
+ private final SharedReference<List<ResolvedExpression>>
receivedMetadataFilters;
+
+ CollidingNameSource(SharedReference<List<ResolvedExpression>>
receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ Map<String, DataType> metadata = new HashMap<>();
+ metadata.put("offset", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters,
Collections.emptyList());
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
new file mode 100644
index 00000000000..cef00b21d69
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testAliasedMetadataColumnFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T3 WHERE event_ts > TIMESTAMP '2024-01-01
00:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($1, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], event_ts=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3,
metadata=[timestamp]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], event_ts=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3,
metadata=[timestamp], metadataFilter=[>(timestamp, 2024-01-01 00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMetadataFilterNotPushedWhenNotSupported">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T2 WHERE event_time > TIMESTAMP '2024-01-01
00:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2,
metadata=[event_time]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2,
metadata=[event_time]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMetadataFilterPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T1 WHERE event_time > TIMESTAMP '2024-01-01
00:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1,
metadata=[event_time]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1,
metadata=[event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMetadataFilterWithProjection">
+ <Resource name="sql">
+ <![CDATA[SELECT id, name FROM T5 WHERE event_time > TIMESTAMP
'2024-01-01 00:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5,
metadata=[event_time]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0], name=[$1])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5,
metadata=[event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMixedPhysicalAndMetadataFilters">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP
'2024-01-01 00:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[AND(>($0, 10), >($2, 2024-01-01 00:00:00))])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4,
metadata=[event_time]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4,
metadata=[event_time], filter=[>(id, 10)], metadataFilter=[>(event_time,
2024-01-01 00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialMetadataFilterAcceptance">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01
00:00:00' AND priority > 5]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[AND(>($1, 2024-01-01 00:00:00), >($2, 5))])
+ +- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6,
metadata=[priority, event_time]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
+ +- LogicalFilter(condition=[>($1, 5)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6,
metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01
00:00:00)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPhysicalAndMetadataNameCollision">
+ <Resource name="sql">
+ <![CDATA[SELECT id FROM T7 WHERE msg_offset > 5]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 5)])
+ +- LogicalProject(id=[$0], offset=[$1], msg_offset=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T7,
metadata=[offset]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], offset=[$1], msg_offset=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T7,
metadata=[offset], metadataFilter=[>(offset, 5)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>