This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e67cfc79f [FLINK-39421][table] Metadata filter push-down for table 
sources
92e67cfc79f is described below

commit 92e67cfc79f79b76e5fa67dc7596554175b57e7e
Author: Jim Hughes <[email protected]>
AuthorDate: Fri Apr 24 09:47:39 2026 -0400

    [FLINK-39421][table] Metadata filter push-down for table sources
    
    This closes #27913.
---
 .../source/abilities/SupportsReadingMetadata.java  |  69 ++++
 .../plan/abilities/source/FilterPushDownSpec.java  | 100 +++--
 .../source/MetadataFilterPushDownSpec.java         | 159 ++++++++
 .../plan/abilities/source/SourceAbilitySpec.java   |   1 +
 .../logical/PushFilterIntoSourceScanRuleBase.java  | 189 ++++++++-
 .../logical/PushFilterIntoTableSourceScanRule.java |  92 ++++-
 .../planner/factories/TestValuesTableFactory.java  | 146 ++++---
 .../serde/DynamicTableSourceSpecSerdeTest.java     |  68 +++-
 .../MetadataFilterInReadingMetadataTest.java       | 452 +++++++++++++++++++++
 .../MetadataFilterInReadingMetadataTest.xml        | 161 ++++++++
 10 files changed, 1300 insertions(+), 137 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index 60b6932b617..0d1c65f0f2f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -25,11 +25,13 @@ import 
org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
     default boolean supportsMetadataProjection() {
         return true;
     }
+
+    /**
+     * Whether this source supports filtering on metadata columns.
+     *
+     * <p>When this method returns {@code true}, the planner may call {@link
+     * #applyMetadataFilters(List)} during optimization with predicates 
expressed in metadata key
+     * names (from {@link #listReadableMetadata()}), not SQL column aliases. 
Sources that do not
+     * override this method will not receive metadata filter predicates.
+     *
+     * <p>This is independent of {@link SupportsFilterPushDown}, which handles 
physical column
+     * predicates. A source can implement both to accept filters on physical 
and metadata columns.
+     */
+    default boolean supportsMetadataFilterPushDown() {
+        return false;
+    }
+
+    /**
+     * Provides a list of metadata filters in conjunctive form. A source can 
pick filters and return
+     * the accepted and remaining filters. Same contract as {@link
+     * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
+     *
+     * <p>The provided filters reference metadata key names (from {@link 
#listReadableMetadata()}),
+     * not SQL column aliases. For example, a column declared as {@code 
msg_offset BIGINT METADATA
+     * FROM 'offset'} will have its predicate expressed as {@code offset >= 
1000}, not {@code
+     * msg_offset >= 1000}. The planner handles the alias-to-key translation 
before calling this
+     * method.
+     */
+    default MetadataFilterResult applyMetadataFilters(List<ResolvedExpression> 
metadataFilters) {
+        return MetadataFilterResult.of(Collections.emptyList(), 
metadataFilters);
+    }
+
+    /**
+     * Result of a metadata filter push down. Communicates the source's 
response to the planner
+     * during optimization.
+     */
+    @PublicEvolving
+    final class MetadataFilterResult {
+        private final List<ResolvedExpression> acceptedFilters;
+        private final List<ResolvedExpression> remainingFilters;
+
+        private MetadataFilterResult(
+                List<ResolvedExpression> acceptedFilters,
+                List<ResolvedExpression> remainingFilters) {
+            this.acceptedFilters = acceptedFilters;
+            this.remainingFilters = remainingFilters;
+        }
+
+        /**
+         * Constructs a metadata filter push-down result.
+         *
+         * @param acceptedFilters filters consumed by the source (best effort)
+         * @param remainingFilters filters that a subsequent operation must 
still apply at runtime
+         */
+        public static MetadataFilterResult of(
+                List<ResolvedExpression> acceptedFilters,
+                List<ResolvedExpression> remainingFilters) {
+            return new MetadataFilterResult(acceptedFilters, remainingFilters);
+        }
+
+        public List<ResolvedExpression> getAcceptedFilters() {
+            return acceptedFilters;
+        }
+
+        public List<ResolvedExpression> getRemainingFilters() {
+            return remainingFilters;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index d30fb1a4181..7a4e1481e04 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -99,50 +99,9 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
             DynamicTableSource tableSource,
             SourceAbilityContext context) {
         if (tableSource instanceof SupportsFilterPushDown) {
-            RexNodeToExpressionConverter converter =
-                    new RexNodeToExpressionConverter(
-                            new RexBuilder(context.getTypeFactory()),
-                            
context.getSourceRowType().getFieldNames().toArray(new String[0]),
-                            context.getFunctionCatalog(),
-                            context.getCatalogManager(),
-                            Option.apply(
-                                    context.getTypeFactory()
-                                            
.buildRelNodeRowType(context.getSourceRowType())));
-            List<Expression> filters =
-                    predicates.stream()
-                            .map(
-                                    p -> {
-                                        scala.Option<ResolvedExpression> expr 
= p.accept(converter);
-                                        if (expr.isDefined()) {
-                                            return expr.get();
-                                        } else {
-                                            throw new TableException(
-                                                    String.format(
-                                                            "%s can not be 
converted to Expression, please make sure %s can accept %s.",
-                                                            p.toString(),
-                                                            
tableSource.getClass().getSimpleName(),
-                                                            p.toString()));
-                                        }
-                                    })
-                            .collect(Collectors.toList());
-            ExpressionResolver resolver =
-                    ExpressionResolver.resolverFor(
-                                    context.getTableConfig(),
-                                    context.getClassLoader(),
-                                    name -> Optional.empty(),
-                                    context.getFunctionCatalog()
-                                            .asLookup(
-                                                    str -> {
-                                                        throw new 
TableException(
-                                                                "We should not 
need to lookup any expressions at this point");
-                                                    }),
-                                    
context.getCatalogManager().getDataTypeFactory(),
-                                    (sqlExpression, inputRowType, outputType) 
-> {
-                                        throw new TableException(
-                                                "SQL expression parsing is not 
supported at this location.");
-                                    })
-                            .build();
-            return ((SupportsFilterPushDown) 
tableSource).applyFilters(resolver.resolve(filters));
+            List<ResolvedExpression> resolved =
+                    resolvePredicates(predicates, context.getSourceRowType(), 
tableSource, context);
+            return ((SupportsFilterPushDown) 
tableSource).applyFilters(resolved);
         } else {
             throw new TableException(
                     String.format(
@@ -151,6 +110,59 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
         }
     }
 
+    /**
+     * Converts {@link RexNode} predicates to {@link ResolvedExpression}s 
using the given row type.
+     * Shared between physical and metadata filter push-down paths.
+     */
+    static List<ResolvedExpression> resolvePredicates(
+            List<RexNode> predicates,
+            RowType rowType,
+            DynamicTableSource tableSource,
+            SourceAbilityContext context) {
+        RexNodeToExpressionConverter converter =
+                new RexNodeToExpressionConverter(
+                        new RexBuilder(context.getTypeFactory()),
+                        rowType.getFieldNames().toArray(new String[0]),
+                        context.getFunctionCatalog(),
+                        context.getCatalogManager(),
+                        
Option.apply(context.getTypeFactory().buildRelNodeRowType(rowType)));
+        List<Expression> filters =
+                predicates.stream()
+                        .map(
+                                p -> {
+                                    scala.Option<ResolvedExpression> expr = 
p.accept(converter);
+                                    if (expr.isDefined()) {
+                                        return expr.get();
+                                    } else {
+                                        throw new TableException(
+                                                String.format(
+                                                        "%s can not be 
converted to Expression, please make sure %s can accept %s.",
+                                                        p.toString(),
+                                                        
tableSource.getClass().getSimpleName(),
+                                                        p.toString()));
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        ExpressionResolver resolver =
+                ExpressionResolver.resolverFor(
+                                context.getTableConfig(),
+                                context.getClassLoader(),
+                                name -> Optional.empty(),
+                                context.getFunctionCatalog()
+                                        .asLookup(
+                                                str -> {
+                                                    throw new TableException(
+                                                            "We should not 
need to lookup any expressions at this point");
+                                                }),
+                                
context.getCatalogManager().getDataTypeFactory(),
+                                (sqlExpression, inputRowType, outputType) -> {
+                                    throw new TableException(
+                                            "SQL expression parsing is not 
supported at this location.");
+                                })
+                        .build();
+        return resolver.resolve(filters);
+    }
+
     @Override
     public boolean needAdjustFieldReferenceAfterProjection() {
         return true;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
new file mode 100644
index 00000000000..090ab45fe27
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializes metadata filter predicates and replays them during compiled plan 
restoration.
+ *
+ * <p>Predicates are stored with a {@code predicateRowType} that already uses 
metadata key names
+ * (not SQL aliases). The alias-to-key translation happens once at 
optimization time, so no
+ * column-to-key mapping needs to be persisted.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("MetadataFilterPushDown")
+public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
+
+    public static final String FIELD_NAME_PREDICATES = "predicates";
+    public static final String FIELD_NAME_PREDICATE_ROW_TYPE = 
"predicateRowType";
+
+    @JsonProperty(FIELD_NAME_PREDICATES)
+    private final List<RexNode> predicates;
+
+    /**
+     * Row type snapshot using metadata key names. Stored because 
ProjectPushDownSpec may narrow the
+     * context's row type during restore.
+     */
+    @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
+    private final RowType predicateRowType;
+
+    @JsonCreator
+    public MetadataFilterPushDownSpec(
+            @JsonProperty(FIELD_NAME_PREDICATES) List<RexNode> predicates,
+            @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType 
predicateRowType) {
+        this.predicates = new ArrayList<>(checkNotNull(predicates));
+        this.predicateRowType = checkNotNull(predicateRowType);
+    }
+
+    public List<RexNode> getPredicates() {
+        return predicates;
+    }
+
+    @Override
+    public void apply(DynamicTableSource tableSource, SourceAbilityContext 
context) {
+        // Use stored predicateRowType; context's row type may be narrowed by 
ProjectPushDownSpec.
+        MetadataFilterResult result =
+                applyMetadataFilters(predicates, predicateRowType, 
tableSource, context);
+        if (result.getAcceptedFilters().size() != predicates.size()) {
+            throw new TableException("All metadata predicates should be 
accepted here.");
+        }
+    }
+
+    /**
+     * Converts RexNode predicates to ResolvedExpressions using the given row 
type and calls
+     * applyMetadataFilters on the source. The row type must already use 
metadata key names.
+     */
+    public static MetadataFilterResult applyMetadataFilters(
+            List<RexNode> predicates,
+            RowType metadataKeyRowType,
+            DynamicTableSource tableSource,
+            SourceAbilityContext context) {
+        if (!(tableSource instanceof SupportsReadingMetadata)) {
+            throw new TableException(
+                    String.format(
+                            "%s does not support SupportsReadingMetadata.",
+                            tableSource.getClass().getName()));
+        }
+        SupportsReadingMetadata readingMetadata = (SupportsReadingMetadata) 
tableSource;
+        if (!readingMetadata.supportsMetadataFilterPushDown()) {
+            throw new TableException(
+                    String.format(
+                            "%s no longer supports metadata filter push-down.",
+                            tableSource.getClass().getName()));
+        }
+        List<ResolvedExpression> resolved =
+                FilterPushDownSpec.resolvePredicates(
+                        predicates, metadataKeyRowType, tableSource, context);
+        return readingMetadata.applyMetadataFilters(resolved);
+    }
+
+    @Override
+    public boolean needAdjustFieldReferenceAfterProjection() {
+        return true;
+    }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        final List<String> expressionStrs = new ArrayList<>();
+        for (RexNode rexNode : predicates) {
+            expressionStrs.add(
+                    FlinkRexUtil.getExpressionString(
+                            rexNode,
+                            
JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
+        }
+
+        return String.format(
+                "metadataFilter=[%s]",
+                expressionStrs.stream()
+                        .reduce((l, r) -> String.format("and(%s, %s)", l, r))
+                        .orElse(""));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
+        return Objects.equals(predicates, that.predicates)
+                && Objects.equals(predicateRowType, that.predicateRowType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), predicates, predicateRowType);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
index e51328d5e9f..20e594304a8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
@@ -38,6 +38,7 @@ import java.util.Optional;
 @JsonSubTypes({
     @JsonSubTypes.Type(value = FilterPushDownSpec.class),
     @JsonSubTypes.Type(value = LimitPushDownSpec.class),
+    @JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
     @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
     @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
     @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 76e5a9b4a10..eb5521ef782 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -17,23 +17,36 @@
 
 package org.apache.flink.table.planner.plan.rules.logical;
 
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
 import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
 import 
org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexVisitorImpl;
 import org.apache.calcite.tools.RelBuilder;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -60,15 +73,8 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
     }
 
     /**
-     * Resolves filters using the underlying sources {@link 
SupportsFilterPushDown} and creates a
-     * new {@link TableSourceTable} with the supplied predicates.
-     *
-     * @param convertiblePredicates Predicates to resolve
-     * @param oldTableSourceTable TableSourceTable to copy
-     * @param scan Underlying table scan to push to
-     * @param relBuilder Builder to push the scan to
-     * @return A tuple, constituting of the resolved filters and the newly 
created {@link
-     *     TableSourceTable}
+     * Resolves filters via {@link SupportsFilterPushDown} and creates a new 
{@link
+     * TableSourceTable}.
      */
     protected Tuple2<SupportsFilterPushDown.Result, TableSourceTable>
             resolveFiltersAndCreateTableSourceTable(
@@ -102,17 +108,168 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
         return new Tuple2<>(result, newTableSourceTable);
     }
 
-    /**
-     * Determines wether we can pushdown the filter into the source. we can 
not push filter twice,
-     * make sure FilterPushDownSpec has not been assigned as a capability.
-     *
-     * @param tableSourceTable Table scan to attempt to push into
-     * @return Whether we can push or not
-     */
+    /** Whether filter push-down is possible and not already assigned. */
     protected boolean canPushdownFilter(TableSourceTable tableSourceTable) {
         return tableSourceTable != null
                 && tableSourceTable.tableSource() instanceof 
SupportsFilterPushDown
                 && Arrays.stream(tableSourceTable.abilitySpecs())
                         .noneMatch(spec -> spec instanceof FilterPushDownSpec);
     }
+
+    /** Whether metadata filter push-down is possible and not already 
assigned. */
+    protected boolean canPushdownMetadataFilter(TableSourceTable 
tableSourceTable) {
+        if (tableSourceTable == null) {
+            return false;
+        }
+        DynamicTableSource source = tableSourceTable.tableSource();
+        if (!(source instanceof SupportsReadingMetadata)) {
+            return false;
+        }
+        if (!((SupportsReadingMetadata) 
source).supportsMetadataFilterPushDown()) {
+            return false;
+        }
+        return Arrays.stream(tableSourceTable.abilitySpecs())
+                .noneMatch(spec -> spec instanceof MetadataFilterPushDownSpec);
+    }
+
+    /**
+     * True if predicate references metadata columns exclusively (no physical 
columns).
+     *
+     * <p>A predicate like {@code OR(physical_pred, metadata_pred)} returns 
false because it
+     * references both physical and metadata columns. Mixed predicates remain 
as runtime filters.
+     */
+    protected boolean referencesOnlyMetadataColumns(RexNode predicate, int 
physicalColumnCount) {
+        boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata
+        predicate.accept(
+                new RexVisitorImpl<Void>(true) {
+                    @Override
+                    public Void visitInputRef(RexInputRef inputRef) {
+                        if (inputRef.getIndex() >= physicalColumnCount) {
+                            saw[1] = true;
+                        } else {
+                            saw[0] = true;
+                        }
+                        return null;
+                    }
+                });
+        return saw[1] && !saw[0];
+    }
+
+    /** Number of physical columns in the scan's schema. */
+    protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) {
+        ResolvedSchema schema = 
tableSourceTable.contextResolvedTable().getResolvedSchema();
+        return (int) 
schema.getColumns().stream().filter(Column::isPhysical).count();
+    }
+
+    /** Maps SQL column names to metadata keys for metadata columns. */
+    protected Map<String, String> buildColumnToMetadataKeyMap(TableSourceTable 
tableSourceTable) {
+        ResolvedSchema schema = 
tableSourceTable.contextResolvedTable().getResolvedSchema();
+        Map<String, String> mapping = new HashMap<>();
+        for (Column col : schema.getColumns()) {
+            if (col instanceof Column.MetadataColumn) {
+                Column.MetadataColumn metaCol = (Column.MetadataColumn) col;
+                String sqlName = metaCol.getName();
+                String metadataKey = metaCol.getMetadataKey().orElse(sqlName);
+                mapping.put(sqlName, metadataKey);
+            }
+        }
+        return mapping;
+    }
+
+    /** Resolves metadata filters and creates a new {@link TableSourceTable}. 
*/
+    protected Tuple2<MetadataFilterResult, TableSourceTable>
+            resolveMetadataFiltersAndCreateTableSourceTable(
+                    RexNode[] metadataPredicates,
+                    TableSourceTable oldTableSourceTable,
+                    TableScan scan,
+                    RelBuilder relBuilder) {
+        DynamicTableSource newTableSource = 
oldTableSourceTable.tableSource().copy();
+        SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);
+
+        // Build a metadata-only row type (field names are metadata keys, not 
SQL aliases) and
+        // an old->new index mapping. Storing only metadata columns avoids 
name collisions with
+        // physical columns (e.g. `offset INT, msg_offset INT METADATA FROM 
'offset'`).
+        MetadataRowInfo metadataRowInfo =
+                buildMetadataRowInfo(oldTableSourceTable, 
abilityContext.getSourceRowType());
+        RexNode[] remappedPredicates =
+                remapPredicates(metadataPredicates, 
metadataRowInfo.oldIndexToNewIndex);
+
+        MetadataFilterResult result =
+                MetadataFilterPushDownSpec.applyMetadataFilters(
+                        Arrays.asList(remappedPredicates),
+                        metadataRowInfo.metadataRowType,
+                        newTableSource,
+                        abilityContext);
+
+        int acceptedCount = result.getAcceptedFilters().size();
+        List<RexNode> acceptedRemappedPredicates = new ArrayList<>();
+        for (int i = 0; i < acceptedCount; i++) {
+            acceptedRemappedPredicates.add(remappedPredicates[i]);
+        }
+        MetadataFilterPushDownSpec metadataSpec =
+                new MetadataFilterPushDownSpec(
+                        acceptedRemappedPredicates, 
metadataRowInfo.metadataRowType);
+
+        TableSourceTable newTableSourceTable =
+                oldTableSourceTable.copy(
+                        newTableSource,
+                        oldTableSourceTable.getStatistic(),
+                        new SourceAbilitySpec[] {metadataSpec});
+
+        return new Tuple2<>(result, newTableSourceTable);
+    }
+
+    /**
+     * Builds a {@link RowType} containing only metadata columns (named by 
metadata key) together
+     * with an old-index-to-new-index mapping from the scan's source row type. 
Non-metadata
+     * positions are absent from the mapping.
+     */
+    private MetadataRowInfo buildMetadataRowInfo(
+            TableSourceTable tableSourceTable, RowType sourceRowType) {
+        Map<String, String> columnToMetadataKey = 
buildColumnToMetadataKeyMap(tableSourceTable);
+        List<RowField> metadataFields = new ArrayList<>();
+        Map<Integer, Integer> oldToNewIndex = new HashMap<>();
+        for (int i = 0; i < sourceRowType.getFieldCount(); i++) {
+            String sqlName = sourceRowType.getFieldNames().get(i);
+            String metadataKey = columnToMetadataKey.get(sqlName);
+            if (metadataKey != null) {
+                oldToNewIndex.put(i, metadataFields.size());
+                metadataFields.add(new RowField(metadataKey, 
sourceRowType.getTypeAt(i)));
+            }
+        }
+        return new MetadataRowInfo(new RowType(false, metadataFields), 
oldToNewIndex);
+    }
+
+    /**
+     * Rewrites {@link RexInputRef}s in each predicate using the supplied 
old->new index map. Throws
+     * if a predicate references an index not in the map (would indicate a 
non-metadata reference
+     * slipped past {@link #referencesOnlyMetadataColumns}).
+     */
+    private RexNode[] remapPredicates(
+            RexNode[] predicates, Map<Integer, Integer> oldIndexToNewIndex) {
+        RexShuttle shuttle =
+                new RexShuttle() {
+                    @Override
+                    public RexNode visitInputRef(RexInputRef inputRef) {
+                        Integer newIdx = 
oldIndexToNewIndex.get(inputRef.getIndex());
+                        if (newIdx == null) {
+                            throw new IllegalStateException(
+                                    "Metadata predicate references 
non-metadata column index "
+                                            + inputRef.getIndex());
+                        }
+                        return new RexInputRef(newIdx, inputRef.getType());
+                    }
+                };
+        return Arrays.stream(predicates).map(p -> 
p.accept(shuttle)).toArray(RexNode[]::new);
+    }
+
+    private static final class MetadataRowInfo {
+        final RowType metadataRowType;
+        final Map<Integer, Integer> oldIndexToNewIndex;
+
+        MetadataRowInfo(RowType metadataRowType, Map<Integer, Integer> 
oldIndexToNewIndex) {
+            this.metadataRowType = metadataRowType;
+            this.oldIndexToNewIndex = oldIndexToNewIndex;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index eaa6999cdd7..57d80b1b2f9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -19,7 +19,8 @@
 package org.apache.flink.table.planner.plan.rules.logical;
 
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 
@@ -29,11 +30,15 @@ import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilder;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import scala.Tuple2;
 
 /**
- * Planner rule that tries to push a filter into a {@link LogicalTableScan}, 
which table is a {@link
- * TableSourceTable}. And the table source in the table is a {@link 
SupportsFilterPushDown}.
+ * Pushes filters from a {@link Filter} into a {@link LogicalTableScan}. 
Physical filters use {@link
+ * SupportsFilterPushDown}; metadata filters use {@link
+ * SupportsReadingMetadata#applyMetadataFilters}.
  */
 public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanRuleBase {
     public static final PushFilterIntoTableSourceScanRule INSTANCE =
@@ -59,7 +64,7 @@ public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanR
         LogicalTableScan scan = call.rel(1);
         TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
 
-        return canPushdownFilter(tableSourceTable);
+        return canPushdownFilter(tableSourceTable) || 
canPushdownMetadataFilter(tableSourceTable);
     }
 
     @Override
@@ -74,7 +79,7 @@ public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanR
             RelOptRuleCall call,
             Filter filter,
             LogicalTableScan scan,
-            FlinkPreparingTableBase relOptTable) {
+            TableSourceTable tableSourceTable) {
 
         RelBuilder relBuilder = call.builder();
         Tuple2<RexNode[], RexNode[]> extractedPredicates =
@@ -87,28 +92,77 @@ public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanR
         RexNode[] convertiblePredicates = extractedPredicates._1;
         RexNode[] unconvertedPredicates = extractedPredicates._2;
         if (convertiblePredicates.length == 0) {
-            // no condition can be translated to expression
             return;
         }
 
-        Tuple2<SupportsFilterPushDown.Result, TableSourceTable> 
scanAfterPushdownWithResult =
-                resolveFiltersAndCreateTableSourceTable(
-                        convertiblePredicates,
-                        relOptTable.unwrap(TableSourceTable.class),
-                        scan,
-                        relBuilder);
+        boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
+        boolean supportsMetadataFilter = 
canPushdownMetadataFilter(tableSourceTable);
+        int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
+
+        // Classify predicates: only separate metadata predicates when the 
source
+        // actually supports metadata filter push-down. Otherwise, all 
predicates
+        // go through the physical path to preserve the FilterPushDownSpec 
guard
+        // that prevents rule re-firing and maintains scan reuse invariants.
+        List<RexNode> physicalPredicates = new ArrayList<>();
+        List<RexNode> metadataPredicates = new ArrayList<>();
+        for (RexNode predicate : convertiblePredicates) {
+            if (supportsMetadataFilter
+                    && referencesOnlyMetadataColumns(predicate, 
physicalColumnCount)) {
+                metadataPredicates.add(predicate);
+            } else {
+                physicalPredicates.add(predicate);
+            }
+        }
+
+        List<RexNode> allRemainingRexNodes = new ArrayList<>();
+        TableSourceTable currentTable = tableSourceTable;
+
+        if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
+            Tuple2<SupportsFilterPushDown.Result, TableSourceTable> 
physicalResult =
+                    resolveFiltersAndCreateTableSourceTable(
+                            physicalPredicates.toArray(new RexNode[0]),
+                            currentTable,
+                            scan,
+                            relBuilder);
+            currentTable = physicalResult._2;
+            List<RexNode> physicalRemaining =
+                    
convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
+            allRemainingRexNodes.addAll(physicalRemaining);
+        } else {
+            allRemainingRexNodes.addAll(physicalPredicates);
+        }
 
-        SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1;
-        TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2;
+        if (!metadataPredicates.isEmpty()) {
+            Tuple2<MetadataFilterResult, TableSourceTable> metadataResult =
+                    resolveMetadataFiltersAndCreateTableSourceTable(
+                            metadataPredicates.toArray(new RexNode[0]),
+                            currentTable,
+                            scan,
+                            relBuilder);
+            currentTable = metadataResult._2;
+            // Remaining (rejected) metadata predicates stay as a 
LogicalFilter above
+            // the scan so they are still evaluated at runtime. We use the 
original
+            // RexNodes (suffix) because the remaining ResolvedExpressions use 
metadata
+            // key names, not SQL aliases needed by the Filter's row type. The
+            // validation in resolveMetadataFiltersAndCreateTableSourceTable 
ensures
+            // the partition invariant (accepted prefix + remaining suffix = 
input).
+            int acceptedCount = metadataResult._1.getAcceptedFilters().size();
+            for (int i = acceptedCount; i < metadataPredicates.size(); i++) {
+                allRemainingRexNodes.add(metadataPredicates.get(i));
+            }
+        }
+
+        for (RexNode unconverted : unconvertedPredicates) {
+            allRemainingRexNodes.add(unconverted);
+        }
 
         LogicalTableScan newScan =
-                LogicalTableScan.create(scan.getCluster(), tableSourceTable, 
scan.getHints());
-        if (result.getRemainingFilters().isEmpty() && 
unconvertedPredicates.length == 0) {
+                LogicalTableScan.create(scan.getCluster(), currentTable, 
scan.getHints());
+
+        if (allRemainingRexNodes.isEmpty()) {
             call.transformTo(newScan);
         } else {
-            RexNode remainingCondition =
-                    createRemainingCondition(
-                            relBuilder, result.getRemainingFilters(), 
unconvertedPredicates);
+            RexNode remainingCondition = relBuilder.and(allRemainingRexNodes);
             RexNode simplifiedRemainingCondition =
                     FlinkRexUtil.simplify(
                             relBuilder.getRexBuilder(),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 1e6b540d4b1..d011687d82e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -440,6 +440,9 @@ public final class TestValuesTableFactory
     private static final ConfigOption<Boolean> ENABLE_WATERMARK_PUSH_DOWN =
             
ConfigOptions.key("enable-watermark-push-down").booleanType().defaultValue(false);
 
+    private static final ConfigOption<Boolean> 
ENABLE_METADATA_FILTER_PUSH_DOWN =
+            
ConfigOptions.key("enable-metadata-filter-push-down").booleanType().defaultValue(false);
+
     private static final ConfigOption<Boolean> ENABLE_CUSTOM_SHUFFLE =
             
ConfigOptions.key("enable-custom-shuffle").booleanType().defaultValue(false);
 
@@ -574,6 +577,8 @@ public final class TestValuesTableFactory
         boolean enableAggregatePushDown = 
helper.getOptions().get(ENABLE_AGGREGATE_PUSH_DOWN);
         boolean nestedProjectionSupported = 
helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
         boolean enableWatermarkPushDown = 
helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN);
+        boolean enableMetadataFilterPushDown =
+                helper.getOptions().get(ENABLE_METADATA_FILTER_PUSH_DOWN);
         boolean failingSource = helper.getOptions().get(FAILING_SOURCE);
         int numElementToSkip = 
helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
         boolean internalData = helper.getOptions().get(INTERNAL_DATA);
@@ -692,45 +697,51 @@ public final class TestValuesTableFactory
 
             if (disableLookup) {
                 if (enableWatermarkPushDown) {
-                    return new TestValuesScanTableSourceWithWatermarkPushDown(
-                            producedDataType,
-                            changelogMode,
-                            terminating,
-                            runtimeSource,
-                            failingSource,
-                            partition2Rows,
-                            context.getObjectIdentifier().getObjectName(),
-                            nestedProjectionSupported,
-                            null,
-                            Collections.emptyList(),
-                            filterableFieldsSet,
-                            dynamicFilteringFieldsSet,
-                            numElementToSkip,
-                            Long.MAX_VALUE,
-                            partitions,
-                            readableMetadata,
-                            null,
-                            enableAggregatePushDown);
+                    TestValuesScanTableSourceWithWatermarkPushDown source =
+                            new TestValuesScanTableSourceWithWatermarkPushDown(
+                                    producedDataType,
+                                    changelogMode,
+                                    terminating,
+                                    runtimeSource,
+                                    failingSource,
+                                    partition2Rows,
+                                    
context.getObjectIdentifier().getObjectName(),
+                                    nestedProjectionSupported,
+                                    null,
+                                    Collections.emptyList(),
+                                    filterableFieldsSet,
+                                    dynamicFilteringFieldsSet,
+                                    numElementToSkip,
+                                    Long.MAX_VALUE,
+                                    partitions,
+                                    readableMetadata,
+                                    null,
+                                    enableAggregatePushDown);
+                    
source.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+                    return source;
                 } else {
-                    return new TestValuesScanTableSource(
-                            producedDataType,
-                            changelogMode,
-                            boundedness,
-                            terminating,
-                            runtimeSource,
-                            failingSource,
-                            partition2Rows,
-                            nestedProjectionSupported,
-                            null,
-                            Collections.emptyList(),
-                            filterableFieldsSet,
-                            dynamicFilteringFieldsSet,
-                            numElementToSkip,
-                            Long.MAX_VALUE,
-                            partitions,
-                            readableMetadata,
-                            null,
-                            enableAggregatePushDown);
+                    TestValuesScanTableSource source =
+                            new TestValuesScanTableSource(
+                                    producedDataType,
+                                    changelogMode,
+                                    boundedness,
+                                    terminating,
+                                    runtimeSource,
+                                    failingSource,
+                                    partition2Rows,
+                                    nestedProjectionSupported,
+                                    null,
+                                    Collections.emptyList(),
+                                    filterableFieldsSet,
+                                    dynamicFilteringFieldsSet,
+                                    numElementToSkip,
+                                    Long.MAX_VALUE,
+                                    partitions,
+                                    readableMetadata,
+                                    null,
+                                    enableAggregatePushDown);
+                    
source.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+                    return source;
                 }
             } else {
                 Collection<Row> consumedData =
@@ -904,6 +915,7 @@ public final class TestValuesTableFactory
                         SINK_CHANGELOG_MODE_ENFORCED,
                         WRITABLE_METADATA,
                         ENABLE_WATERMARK_PUSH_DOWN,
+                        ENABLE_METADATA_FILTER_PUSH_DOWN,
                         SINK_DROP_LATE_EVENT,
                         SINK_BUCKET_COUNT_REQUIRED,
                         SINK_SUPPORTS_DELETE_BY_KEY,
@@ -1088,6 +1100,7 @@ public final class TestValuesTableFactory
         protected final Map<String, DataType> readableMetadata;
         protected @Nullable int[] projectedMetadataFields;
         protected final boolean enableAggregatePushDown;
+        protected boolean enableMetadataFilterPushDown;
 
         private @Nullable int[] groupingSet;
         private List<AggregateExpression> aggregateExpressions;
@@ -1619,6 +1632,22 @@ public final class TestValuesTableFactory
                     
remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
         }
 
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return enableMetadataFilterPushDown;
+        }
+
+        @Override
+        public SupportsReadingMetadata.MetadataFilterResult 
applyMetadataFilters(
+                List<ResolvedExpression> metadataFilters) {
+            return SupportsReadingMetadata.MetadataFilterResult.of(
+                    metadataFilters, Collections.emptyList());
+        }
+
+        void setEnableMetadataFilterPushDown(boolean enable) {
+            this.enableMetadataFilterPushDown = enable;
+        }
+
         @Override
         public List<String> listAcceptedFilterFields() {
             return new ArrayList<>(dynamicFilteringFields);
@@ -1678,25 +1707,28 @@ public final class TestValuesTableFactory
 
         @Override
         public DynamicTableSource copy() {
-            return new TestValuesScanTableSource(
-                    producedDataType,
-                    changelogMode,
-                    boundedness,
-                    terminating,
-                    runtimeSource,
-                    failingSource,
-                    data,
-                    nestedProjectionSupported,
-                    projectedPhysicalFields,
-                    filterPredicates,
-                    filterableFields,
-                    dynamicFilteringFields,
-                    numElementToSkip,
-                    limit,
-                    allPartitions,
-                    readableMetadata,
-                    projectedMetadataFields,
-                    enableAggregatePushDown);
+            TestValuesScanTableSource copy =
+                    new TestValuesScanTableSource(
+                            producedDataType,
+                            changelogMode,
+                            boundedness,
+                            terminating,
+                            runtimeSource,
+                            failingSource,
+                            data,
+                            nestedProjectionSupported,
+                            projectedPhysicalFields,
+                            filterPredicates,
+                            filterableFields,
+                            dynamicFilteringFields,
+                            numElementToSkip,
+                            limit,
+                            allPartitions,
+                            readableMetadata,
+                            projectedMetadataFields,
+                            enableAggregatePushDown);
+            copy.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+            return copy;
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index 6fd5942f19e..478c91eedcd 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
 import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
 import 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
 import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
 import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
@@ -64,6 +65,7 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.TimestampString;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -274,7 +276,71 @@ public class DynamicTableSourceSpecSerdeTest {
                                                         put("p", "B");
                                                     }
                                                 }))));
-        return Stream.of(spec1, spec2);
+        Map<String, String> options3 = new HashMap<>();
+        options3.put("connector", TestValuesTableFactory.IDENTIFIER);
+        options3.put("disable-lookup", "true");
+        options3.put("enable-metadata-filter-push-down", "true");
+        options3.put("bounded", "false");
+        options3.put("readable-metadata", "timestamp:TIMESTAMP(3), 
offset:BIGINT");
+
+        final ResolvedSchema resolvedSchema3 =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", DataTypes.INT()),
+                                Column.metadata(
+                                        "rowtime", DataTypes.TIMESTAMP(3), 
"timestamp", false),
+                                Column.metadata("offset", DataTypes.BIGINT(), 
null, false)),
+                        Collections.emptyList(),
+                        null,
+                        Collections.emptyList(),
+                        null);
+
+        final CatalogTable catalogTable3 =
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema3).build())
+                        .options(options3)
+                        .build();
+
+        // predicateRowType uses metadata key names (already translated from 
SQL aliases).
+        RowType predicateRowType3 =
+                RowType.of(
+                        new LogicalType[] {new TimestampType(3), new 
BigIntType()},
+                        new String[] {"timestamp", "offset"});
+
+        DynamicTableSourceSpec spec3 =
+                new DynamicTableSourceSpec(
+                        ContextResolvedTable.temporary(
+                                ObjectIdentifier.of(
+                                        
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),
+                                        
TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(),
+                                        "MyTableMetadata"),
+                                new ResolvedCatalogTable(catalogTable3, 
resolvedSchema3)),
+                        Collections.singletonList(
+                                new MetadataFilterPushDownSpec(
+                                        Arrays.asList(
+                                                // timestamp > '2024-01-01'
+                                                rexBuilder.makeCall(
+                                                        
SqlStdOperatorTable.GREATER_THAN,
+                                                        
rexBuilder.makeInputRef(
+                                                                
factory.createSqlType(
+                                                                        
SqlTypeName.TIMESTAMP, 3),
+                                                                0),
+                                                        
rexBuilder.makeTimestampLiteral(
+                                                                new 
TimestampString(
+                                                                        
"2024-01-01 00:00:00"),
+                                                                3)),
+                                                // offset >= 10
+                                                rexBuilder.makeCall(
+                                                        
SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+                                                        
rexBuilder.makeInputRef(
+                                                                
factory.createSqlType(
+                                                                        
SqlTypeName.BIGINT),
+                                                                1),
+                                                        
rexBuilder.makeExactLiteral(
+                                                                new 
BigDecimal(10)))),
+                                        predicateRowType3)));
+
+        return Stream.of(spec1, spec2, spec3);
     }
 
     @ParameterizedTest
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
new file mode 100644
index 00000000000..ffd6aa0ea89
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
+import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import 
org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for metadata filter push-down through {@link 
SupportsReadingMetadata}. */
+class MetadataFilterInReadingMetadataTest extends TableTestBase {
+
+    @RegisterExtension
+    private final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
+
+    private BatchTableTestUtil util;
+
+    @BeforeEach
+    void setup() {
+        util = batchTestUtil(TableConfig.getDefault());
+        util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+        CalciteConfig calciteConfig =
+                TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+        calciteConfig
+                .getBatchProgram()
+                .get()
+                .addLast(
+                        "rules",
+                        
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+                                .setHepRulesExecutionType(
+                                        
HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+                                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                                .add(
+                                        RuleSets.ofList(
+                                                
PushFilterIntoTableSourceScanRule.INSTANCE,
+                                                
CoreRules.FILTER_PROJECT_TRANSPOSE))
+                                .build());
+    }
+
+    @Test
+    void testMetadataFilterPushDown() {
+        SharedReference<List<ResolvedExpression>> receivedFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(MetadataFilterSource.SCHEMA)
+                        .source(new MetadataFilterSource(true, 
receivedFilters))
+                        .build();
+        util.tableEnv().createTable("T1", descriptor);
+
+        util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
+
+        assertThat(receivedFilters.get().toString())
+                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+    }
+
+    @Test
+    void testMetadataFilterNotPushedWhenNotSupported() {
+        SharedReference<List<ResolvedExpression>> receivedFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(MetadataFilterSource.SCHEMA)
+                        .source(new MetadataFilterSource(false, 
receivedFilters))
+                        .build();
+        util.tableEnv().createTable("T2", descriptor);
+
+        util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
+
+        // No metadata filters should have been pushed
+        assertThat(receivedFilters.get()).isEmpty();
+    }
+
+    @Test
+    void testAliasedMetadataColumnFilter() {
+        SharedReference<List<ResolvedExpression>> receivedFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(RenamedMetadataFilterSource.SCHEMA)
+                        .source(new 
RenamedMetadataFilterSource(receivedFilters))
+                        .build();
+        util.tableEnv().createTable("T3", descriptor);
+
+        // 'event_ts' is the SQL alias for metadata key 'timestamp'
+        util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP 
'2024-01-01 00:00:00'");
+
+        // The source should receive the filter with metadata key 'timestamp', 
not 'event_ts'.
+        assertThat(receivedFilters.get().toString())
+                .isEqualTo("[greaterThan(timestamp, 2024-01-01T00:00)]");
+    }
+
+    @Test
+    void testMixedPhysicalAndMetadataFilters() {
+        SharedReference<List<ResolvedExpression>> metadataFilters =
+                sharedObjects.add(new ArrayList<>());
+        SharedReference<List<ResolvedExpression>> physicalFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(MixedFilterSource.SCHEMA)
+                        .source(new MixedFilterSource(metadataFilters, 
physicalFilters))
+                        .build();
+        util.tableEnv().createTable("T4", descriptor);
+
+        util.verifyRelPlan(
+                "SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
+
+        // Verify routing: id > 10 → physical path, event_time > ... → 
metadata path.
+        
assertThat(physicalFilters.get().toString()).isEqualTo("[greaterThan(id, 10)]");
+        assertThat(metadataFilters.get().toString())
+                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+    }
+
+    @Test
+    void testPartialMetadataFilterAcceptance() {
+        SharedReference<List<ResolvedExpression>> receivedFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(PartialMetadataFilterSource.SCHEMA)
+                        .source(new 
PartialMetadataFilterSource(receivedFilters))
+                        .build();
+        util.tableEnv().createTable("T6", descriptor);
+
+        // Two metadata filters: the source accepts only the first one
+        util.verifyRelPlan(
+                "SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01 
00:00:00'"
+                        + " AND priority > 5");
+
+        // Source receives both filters; the XML reference verifies only the 
first is accepted
+        // (the second remains as a LogicalFilter above the scan).
+        assertThat(receivedFilters.get().toString())
+                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00), 
greaterThan(priority, 5)]");
+    }
+
+    @Test
+    void testPhysicalAndMetadataNameCollision() {
+        // Physical column 'offset' shares a name with the metadata key 
'offset'
+        // (aliased in SQL as 'msg_offset'). The predicate on the metadata 
column
+        // must be pushed down using the metadata key, not confused with the
+        // physical column of the same name.
+        SharedReference<List<ResolvedExpression>> receivedFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(CollidingNameSource.SCHEMA)
+                        .source(new CollidingNameSource(receivedFilters))
+                        .build();
+        util.tableEnv().createTable("T7", descriptor);
+
+        util.verifyRelPlan("SELECT id FROM T7 WHERE msg_offset > 5");
+
+        // Must reference the metadata key 'offset', NOT the SQL alias 
'msg_offset'.
+        
assertThat(receivedFilters.get().toString()).isEqualTo("[greaterThan(offset, 
5)]");
+    }
+
+    @Test
+    void testMetadataFilterWithProjection() {
+        SharedReference<List<ResolvedExpression>> receivedFilters =
+                sharedObjects.add(new ArrayList<>());
+        TableDescriptor descriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(MetadataFilterSource.SCHEMA)
+                        .source(new MetadataFilterSource(true, 
receivedFilters))
+                        .build();
+        util.tableEnv().createTable("T5", descriptor);
+
+        util.verifyRelPlan(
+                "SELECT id, name FROM T5 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00'");
+
+        // Projection push-down must not perturb the metadata filter.
+        assertThat(receivedFilters.get().toString())
+                .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+    }
+
+    // 
-----------------------------------------------------------------------------------------
+    // Test sources
+    // 
-----------------------------------------------------------------------------------------
+
+    /** Supports metadata filter push-down. */
+    private static class MetadataFilterSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        public static final Schema SCHEMA =
+                Schema.newBuilder()
+                        .column("id", INT())
+                        .column("name", STRING())
+                        .columnByMetadata("event_time", TIMESTAMP(3))
+                        .build();
+
+        private final boolean supportsMetadataFilter;
+        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
+
+        MetadataFilterSource(
+                boolean supportsMetadataFilter,
+                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
+            this.supportsMetadataFilter = supportsMetadataFilter;
+            this.receivedMetadataFilters = receivedMetadataFilters;
+        }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("event_time", 
org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return supportsMetadataFilter;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            receivedMetadataFilters.get().addAll(metadataFilters);
+            return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
+        }
+    }
+
+    /** Tests key translation when SQL alias differs from metadata key. */
+    private static class RenamedMetadataFilterSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        public static final Schema SCHEMA =
+                Schema.newBuilder()
+                        .column("id", INT())
+                        .columnByMetadata("event_ts", TIMESTAMP(3), 
"timestamp")
+                        .build();
+
+        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
+
+        RenamedMetadataFilterSource(
+                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
+            this.receivedMetadataFilters = receivedMetadataFilters;
+        }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("timestamp", 
org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            receivedMetadataFilters.get().addAll(metadataFilters);
+            return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
+        }
+    }
+
+    /** Accepts only the first metadata filter; rejected filters remain in 
plan. */
+    private static class PartialMetadataFilterSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        public static final Schema SCHEMA =
+                Schema.newBuilder()
+                        .column("id", INT())
+                        .columnByMetadata("event_time", TIMESTAMP(3))
+                        .columnByMetadata("priority", INT())
+                        .build();
+
+        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
+
+        PartialMetadataFilterSource(
+                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
+            this.receivedMetadataFilters = receivedMetadataFilters;
+        }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("event_time", TIMESTAMP(3));
+            metadata.put("priority", INT());
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            receivedMetadataFilters.get().addAll(metadataFilters);
+            // Accept only the first filter
+            List<ResolvedExpression> accepted =
+                    metadataFilters.isEmpty()
+                            ? Collections.emptyList()
+                            : 
Collections.singletonList(metadataFilters.get(0));
+            List<ResolvedExpression> remaining =
+                    metadataFilters.size() > 1
+                            ? metadataFilters.subList(1, 
metadataFilters.size())
+                            : Collections.emptyList();
+            return MetadataFilterResult.of(accepted, remaining);
+        }
+    }
+
+    /** Tests mixed physical and metadata filter push-down. */
+    private static class MixedFilterSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata, SupportsFilterPushDown {
+
+        public static final Schema SCHEMA =
+                Schema.newBuilder()
+                        .column("id", INT())
+                        .column("name", STRING())
+                        .columnByMetadata("event_time", TIMESTAMP(3))
+                        .build();
+
+        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
+        private final SharedReference<List<ResolvedExpression>> 
receivedPhysicalFilters;
+
+        MixedFilterSource(
+                SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters,
+                SharedReference<List<ResolvedExpression>> 
receivedPhysicalFilters) {
+            this.receivedMetadataFilters = receivedMetadataFilters;
+            this.receivedPhysicalFilters = receivedPhysicalFilters;
+        }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("event_time", 
org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            receivedMetadataFilters.get().addAll(metadataFilters);
+            return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
+        }
+
+        @Override
+        public Result applyFilters(List<ResolvedExpression> filters) {
+            receivedPhysicalFilters.get().addAll(filters);
+            return Result.of(filters, Collections.emptyList());
+        }
+    }
+
+    /**
+     * Physical column {@code offset} shares a name with the metadata key 
{@code offset} (SQL alias
+     * {@code msg_offset}). Exercises the physical-vs-metadata name collision 
case.
+     */
+    private static class CollidingNameSource extends 
TableFactoryHarness.ScanSourceBase
+            implements SupportsReadingMetadata {
+
+        public static final Schema SCHEMA =
+                Schema.newBuilder()
+                        .column("id", INT())
+                        .column("offset", INT())
+                        .columnByMetadata("msg_offset", INT(), "offset")
+                        .build();
+
+        private final SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters;
+
+        CollidingNameSource(SharedReference<List<ResolvedExpression>> 
receivedMetadataFilters) {
+            this.receivedMetadataFilters = receivedMetadataFilters;
+        }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metadata = new HashMap<>();
+            metadata.put("offset", INT());
+            return metadata;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public boolean supportsMetadataFilterPushDown() {
+            return true;
+        }
+
+        @Override
+        public MetadataFilterResult 
applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
+            receivedMetadataFilters.get().addAll(metadataFilters);
+            return MetadataFilterResult.of(metadataFilters, 
Collections.emptyList());
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
new file mode 100644
index 00000000000..cef00b21d69
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testAliasedMetadataColumnFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T3 WHERE event_ts > TIMESTAMP '2024-01-01 
00:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($1, 2024-01-01 00:00:00)])
+   +- LogicalProject(id=[$0], event_ts=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, 
metadata=[timestamp]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], event_ts=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, 
metadata=[timestamp], metadataFilter=[>(timestamp, 2024-01-01 00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMetadataFilterNotPushedWhenNotSupported">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T2 WHERE event_time > TIMESTAMP '2024-01-01 
00:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+   +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2, 
metadata=[event_time]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+   +- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2, 
metadata=[event_time]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMetadataFilterPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T1 WHERE event_time > TIMESTAMP '2024-01-01 
00:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+   +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T1, 
metadata=[event_time]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1, 
metadata=[event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMetadataFilterWithProjection">
+    <Resource name="sql">
+      <![CDATA[SELECT id, name FROM T5 WHERE event_time > TIMESTAMP 
'2024-01-01 00:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1])
++- LogicalFilter(condition=[>($2, 2024-01-01 00:00:00)])
+   +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, 
metadata=[event_time]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, 
metadata=[event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMixedPhysicalAndMetadataFilters">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP 
'2024-01-01 00:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[AND(>($0, 10), >($2, 2024-01-01 00:00:00))])
+   +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
metadata=[event_time]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
metadata=[event_time], filter=[>(id, 10)], metadataFilter=[>(event_time, 
2024-01-01 00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialMetadataFilterAcceptance">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01 
00:00:00' AND priority > 5]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[AND(>($1, 2024-01-01 00:00:00), >($2, 5))])
+   +- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T6, 
metadata=[priority, event_time]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
+   +- LogicalFilter(condition=[>($1, 5)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T6, 
metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01 
00:00:00)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPhysicalAndMetadataNameCollision">
+    <Resource name="sql">
+      <![CDATA[SELECT id FROM T7 WHERE msg_offset > 5]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalFilter(condition=[>($2, 5)])
+   +- LogicalProject(id=[$0], offset=[$1], msg_offset=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T7, 
metadata=[offset]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0])
++- LogicalProject(id=[$0], offset=[$1], msg_offset=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T7, 
metadata=[offset], metadataFilter=[>(offset, 5)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>

Reply via email to