This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f88ff66d0 API, Core: Update inclusive metrics evaluator for extract 
and transforms (#12311)
2f88ff66d0 is described below

commit 2f88ff66d05269b04e3621fe067ccdab668f3191
Author: Ryan Blue <[email protected]>
AuthorDate: Tue Feb 25 16:48:12 2025 -0800

    API, Core: Update inclusive metrics evaluator for extract and transforms 
(#12311)
---
 .../expressions/InclusiveMetricsEvaluator.java     | 473 ++++++++------
 .../iceberg/expressions/VariantExpressionUtil.java | 118 ++++
 .../TestInclusiveMetricsEvaluatorWithExtract.java  | 678 +++++++++++++++++++++
 ...estInclusiveMetricsEvaluatorWithTransforms.java | 661 ++++++++++++++++++++
 .../apache/iceberg/spark/source/TestSparkScan.java |  23 +-
 .../apache/iceberg/spark/source/TestSparkScan.java |  23 +-
 6 files changed, 1777 insertions(+), 199 deletions(-)

diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 172b6a727d..e667b096c2 100644
--- 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -29,12 +29,13 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Schema;
-import 
org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
+import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.BinaryUtil;
 import org.apache.iceberg.util.NaNUtil;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantObject;
 
 /**
  * Evaluates an {@link Expression} on a {@link DataFile} to test whether rows 
in the file may match.
@@ -79,7 +80,7 @@ public class InclusiveMetricsEvaluator {
   private static final boolean ROWS_MIGHT_MATCH = true;
   private static final boolean ROWS_CANNOT_MATCH = false;
 
-  private class MetricsEvalVisitor extends BoundExpressionVisitor<Boolean> {
+  private class MetricsEvalVisitor extends 
ExpressionVisitors.BoundVisitor<Boolean> {
     private Map<Integer, Long> valueCounts = null;
     private Map<Integer, Long> nullCounts = null;
     private Map<Integer, Long> nanCounts = null;
@@ -107,19 +108,6 @@ public class InclusiveMetricsEvaluator {
       return ExpressionVisitors.visitEvaluator(expr, this);
     }
 
-    @Override
-    public <T> Boolean handleNonReference(Bound<T> term) {
-      // If the term in any expression is not a direct reference, assume that 
rows may match. This
-      // happens when
-      // transforms or other expressions are passed to this evaluator. For 
example, bucket16(x) = 0
-      // can't be determined
-      // because this visitor operates on data metrics and not partition 
values. It may be possible
-      // to un-transform
-      // expressions for order preserving transforms in the future, but this 
is not currently
-      // supported.
-      return ROWS_MIGHT_MATCH;
-    }
-
     @Override
     public Boolean alwaysTrue() {
       return ROWS_MIGHT_MATCH; // all rows match
@@ -146,24 +134,27 @@ public class InclusiveMetricsEvaluator {
     }
 
     @Override
-    public <T> Boolean isNull(BoundReference<T> ref) {
+    public <T> Boolean isNull(Bound<T> term) {
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no null values, the expression cannot match
-      Integer id = ref.fieldId();
-
-      if (nullCounts != null && nullCounts.containsKey(id) && 
nullCounts.get(id) == 0) {
-        return ROWS_CANNOT_MATCH;
+      if (isNonNullPreserving(term)) {
+        // number of non-nulls is the same as for the ref
+        Integer id = term.ref().fieldId();
+        if (!mayContainNull(id)) {
+          return ROWS_CANNOT_MATCH;
+        }
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean notNull(BoundReference<T> ref) {
+    public <T> Boolean notNull(Bound<T> term) {
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no non-null values, the expression cannot match
-      Integer id = ref.fieldId();
 
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
@@ -172,25 +163,33 @@ public class InclusiveMetricsEvaluator {
     }
 
     @Override
-    public <T> Boolean isNaN(BoundReference<T> ref) {
-      Integer id = ref.fieldId();
-
-      if (nanCounts != null && nanCounts.containsKey(id) && nanCounts.get(id) 
== 0) {
-        return ROWS_CANNOT_MATCH;
-      }
-
+    public <T> Boolean isNaN(Bound<T> term) {
       // when there's no nanCounts information, but we already know the column 
only contains null,
       // it's guaranteed that there's no NaN value
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
+      if (!(term instanceof BoundReference)) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      if (nanCounts != null && nanCounts.containsKey(id) && nanCounts.get(id) 
== 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean notNaN(BoundReference<T> ref) {
-      Integer id = ref.fieldId();
+    public <T> Boolean notNaN(Bound<T> term) {
+      if (!(term instanceof BoundReference)) {
+        // identity transforms are already removed by this time
+        return ROWS_MIGHT_MATCH;
+      }
+
+      Integer id = term.ref().fieldId();
 
       if (containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
@@ -200,140 +199,141 @@ public class InclusiveMetricsEvaluator {
     }
 
     @Override
-    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean lt(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id) || containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
-      if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
-
-        if (NaNUtil.isNaN(lower)) {
-          // NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
-          return ROWS_MIGHT_MATCH;
-        }
+      T lower = lowerBound(term);
+      if (null == lower || NaNUtil.isNaN(lower)) {
+        // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator 
docs for more.
+        return ROWS_MIGHT_MATCH;
+      }
 
-        int cmp = lit.comparator().compare(lower, lit.value());
-        if (cmp >= 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      // this also works for transforms that are order preserving:
+      // if a transform f is order preserving, a < b means that f(a) <= f(b).
+      // because lower <= a for all values of a in the file, f(lower) <= f(a).
+      // when f(lower) >= X then f(a) >= f(lower) >= X, so there is no a such 
that f(a) < X
+      // f(lower) >= X means rows cannot match
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp >= 0) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean ltEq(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id) || containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
-      if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
-
-        if (NaNUtil.isNaN(lower)) {
-          // NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
-          return ROWS_MIGHT_MATCH;
-        }
+      T lower = lowerBound(term);
+      if (null == lower || NaNUtil.isNaN(lower)) {
+        // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator 
docs for more.
+        return ROWS_MIGHT_MATCH;
+      }
 
-        int cmp = lit.comparator().compare(lower, lit.value());
-        if (cmp > 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      // this also works for transforms that are order preserving:
+      // if a transform f is order preserving, a < b means that f(a) <= f(b).
+      // because lower <= a for all values of a in the file, f(lower) <= f(a).
+      // when f(lower) > X then f(a) >= f(lower) > X, so there is no a such 
that f(a) <= X
+      // f(lower) > X means rows cannot match
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean gt(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id) || containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
-      if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
 
-        int cmp = lit.comparator().compare(upper, lit.value());
-        if (cmp <= 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp <= 0) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean gtEq(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id) || containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
-      if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
 
-        int cmp = lit.comparator().compare(upper, lit.value());
-        if (cmp < 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean eq(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id) || containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
-      if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
-
-        if (NaNUtil.isNaN(lower)) {
-          // NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
-          return ROWS_MIGHT_MATCH;
-        }
-
+      T lower = lowerBound(term);
+      if (lower != null && !NaNUtil.isNaN(lower)) {
         int cmp = lit.comparator().compare(lower, lit.value());
         if (cmp > 0) {
           return ROWS_CANNOT_MATCH;
         }
       }
 
-      if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
 
-        int cmp = lit.comparator().compare(upper, lit.value());
-        if (cmp < 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+    public <T> Boolean notEq(Bound<T> term, Literal<T> lit) {
       // because the bounds are not necessarily a min or max value, this 
cannot be answered using
       // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean in(Bound<T> term, Set<T> literalSet) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id) || containsNaNsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
@@ -345,125 +345,126 @@ public class InclusiveMetricsEvaluator {
         return ROWS_MIGHT_MATCH;
       }
 
-      if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
+      T lower = lowerBound(term);
+      if (null == lower || NaNUtil.isNaN(lower)) {
+        // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator 
docs for more.
+        return ROWS_MIGHT_MATCH;
+      }
 
-        if (NaNUtil.isNaN(lower)) {
-          // NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
-          return ROWS_MIGHT_MATCH;
-        }
+      literals =
+          literals.stream()
+              .filter(v -> ((BoundTerm<T>) term).comparator().compare(lower, 
v) <= 0)
+              .collect(Collectors.toList());
+      // if all values are less than lower bound, rows cannot match
+      if (literals.isEmpty()) {
+        return ROWS_CANNOT_MATCH;
+      }
 
-        literals =
-            literals.stream()
-                .filter(v -> ref.comparator().compare(lower, v) <= 0)
-                .collect(Collectors.toList());
-        if (literals.isEmpty()) { // if all values are less than lower bound, 
rows cannot match.
-          return ROWS_CANNOT_MATCH;
-        }
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
       }
 
-      if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
-        literals =
-            literals.stream()
-                .filter(v -> ref.comparator().compare(upper, v) >= 0)
-                .collect(Collectors.toList());
-        if (literals
-            .isEmpty()) { // if all remaining values are greater than upper 
bound, rows cannot
-          // match.
-          return ROWS_CANNOT_MATCH;
-        }
+      literals =
+          literals.stream()
+              .filter(v -> ((BoundTerm<T>) term).comparator().compare(upper, 
v) >= 0)
+              .collect(Collectors.toList());
+      // if remaining values are greater than upper bound, rows cannot match
+      if (literals.isEmpty()) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+    public <T> Boolean notIn(Bound<T> term, Set<T> literalSet) {
       // because the bounds are not necessarily a min or max value, this 
cannot be answered using
       // them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a 
value in col.
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
+    public <T> Boolean startsWith(Bound<T> term, Literal<T> lit) {
+      if (term instanceof BoundTransform
+          && !((BoundTransform<?, ?>) term).transform().isIdentity()) {
+        // truncate must be rewritten in binding. the result is either always 
or never compatible
+        return ROWS_MIGHT_MATCH;
+      }
 
+      Integer id = term.ref().fieldId();
       if (containsNullsOnly(id)) {
         return ROWS_CANNOT_MATCH;
       }
 
-      ByteBuffer prefixAsBytes = lit.toByteBuffer();
+      String prefix = (String) lit.value();
 
-      Comparator<ByteBuffer> comparator = Comparators.unsignedBytes();
+      Comparator<CharSequence> comparator = Comparators.charSequences();
 
-      if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        ByteBuffer lower = lowerBounds.get(id);
-        // truncate lower bound so that its length in bytes is not greater 
than the length of prefix
-        int length = Math.min(prefixAsBytes.remaining(), lower.remaining());
-        int cmp = comparator.compare(BinaryUtil.truncateBinary(lower, length), 
prefixAsBytes);
-        if (cmp > 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      CharSequence lower = (CharSequence) lowerBound(term);
+      if (null == lower) {
+        return ROWS_MIGHT_MATCH;
       }
 
-      if (upperBounds != null && upperBounds.containsKey(id)) {
-        ByteBuffer upper = upperBounds.get(id);
-        // truncate upper bound so that its length in bytes is not greater 
than the length of prefix
-        int length = Math.min(prefixAsBytes.remaining(), upper.remaining());
-        int cmp = comparator.compare(BinaryUtil.truncateBinary(upper, length), 
prefixAsBytes);
-        if (cmp < 0) {
-          return ROWS_CANNOT_MATCH;
-        }
+      // truncate lower bound so that its length in bytes is not greater than 
the length of prefix
+      int length = Math.min(prefix.length(), lower.length());
+      int cmp = comparator.compare(lower.subSequence(0, length), prefix);
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      CharSequence upper = (CharSequence) upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      // truncate upper bound so that its length in bytes is not greater than 
the length of prefix
+      length = Math.min(prefix.length(), upper.length());
+      cmp = comparator.compare(upper.subSequence(0, length), prefix);
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
       }
 
       return ROWS_MIGHT_MATCH;
     }
 
     @Override
-    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
-      Integer id = ref.fieldId();
-
+    public <T> Boolean notStartsWith(Bound<T> term, Literal<T> lit) {
+      // the only transforms that produce strings are truncate and identity, 
which work with this
+      Integer id = term.ref().fieldId();
       if (mayContainNull(id)) {
         return ROWS_MIGHT_MATCH;
       }
 
-      ByteBuffer prefixAsBytes = lit.toByteBuffer();
+      String prefix = (String) lit.value();
 
-      Comparator<ByteBuffer> comparator = Comparators.unsignedBytes();
+      Comparator<CharSequence> comparator = Comparators.charSequences();
 
       // notStartsWith will match unless all values must start with the 
prefix. This happens when
-      // the lower and upper
-      // bounds both start with the prefix.
-      if (lowerBounds != null
-          && upperBounds != null
-          && lowerBounds.containsKey(id)
-          && upperBounds.containsKey(id)) {
-        ByteBuffer lower = lowerBounds.get(id);
-        // if lower is shorter than the prefix then lower doesn't start with 
the prefix
-        if (lower.remaining() < prefixAsBytes.remaining()) {
+      // the lower and upper bounds both start with the prefix.
+      CharSequence lower = (CharSequence) lowerBound(term);
+      CharSequence upper = (CharSequence) upperBound(term);
+      if (null == lower || null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      // if lower is shorter than the prefix then lower doesn't start with the 
prefix
+      if (lower.length() < prefix.length()) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      int cmp = comparator.compare(lower.subSequence(0, prefix.length()), 
prefix);
+      if (cmp == 0) {
+        // if upper is shorter than the prefix then upper can't start with the 
prefix
+        if (upper.length() < prefix.length()) {
           return ROWS_MIGHT_MATCH;
         }
 
-        int cmp =
-            comparator.compare(
-                BinaryUtil.truncateBinary(lower, prefixAsBytes.remaining()), 
prefixAsBytes);
+        cmp = comparator.compare(upper.subSequence(0, prefix.length()), 
prefix);
         if (cmp == 0) {
-          ByteBuffer upper = upperBounds.get(id);
-          // if upper is shorter than the prefix then upper can't start with 
the prefix
-          if (upper.remaining() < prefixAsBytes.remaining()) {
-            return ROWS_MIGHT_MATCH;
-          }
-
-          cmp =
-              comparator.compare(
-                  BinaryUtil.truncateBinary(upper, prefixAsBytes.remaining()), 
prefixAsBytes);
-          if (cmp == 0) {
-            // both bounds match the prefix, so all rows must match the prefix 
and therefore do not
-            // satisfy
-            // the predicate
-            return ROWS_CANNOT_MATCH;
-          }
+          // both bounds match the prefix, so all rows must match the prefix 
and therefore do not
+          // satisfy the predicate
+          return ROWS_CANNOT_MATCH;
         }
       }
 
@@ -471,7 +472,7 @@ public class InclusiveMetricsEvaluator {
     }
 
     private boolean mayContainNull(Integer id) {
-      return nullCounts == null || (nullCounts.containsKey(id) && 
nullCounts.get(id) != 0);
+      return nullCounts == null || !nullCounts.containsKey(id) || 
nullCounts.get(id) != 0;
     }
 
     private boolean containsNullsOnly(Integer id) {
@@ -488,5 +489,123 @@ public class InclusiveMetricsEvaluator {
           && valueCounts != null
           && nanCounts.get(id).equals(valueCounts.get(id));
     }
+
+    private <T> T lowerBound(Bound<T> term) {
+      if (term instanceof BoundReference) {
+        return parseLowerBound((BoundReference<T>) term);
+      } else if (term instanceof BoundTransform) {
+        return transformLowerBound((BoundTransform<?, T>) term);
+      } else if (term instanceof BoundExtract) {
+        return extractLowerBound((BoundExtract<T>) term);
+      } else {
+        return null;
+      }
+    }
+
+    private <T> T upperBound(Bound<T> term) {
+      if (term instanceof BoundReference) {
+        return parseUpperBound((BoundReference<T>) term);
+      } else if (term instanceof BoundTransform) {
+        return transformUpperBound((BoundTransform<?, T>) term);
+      } else if (term instanceof BoundExtract) {
+        return extractUpperBound((BoundExtract<T>) term);
+      } else {
+        return null;
+      }
+    }
+
+    private <T> T parseLowerBound(BoundReference<T> ref) {
+      Integer id = ref.fieldId();
+      if (lowerBounds != null && lowerBounds.containsKey(id)) {
+        return Conversions.fromByteBuffer(ref.ref().type(), 
lowerBounds.get(id));
+      }
+
+      return null;
+    }
+
+    private <T> T parseUpperBound(BoundReference<T> ref) {
+      Integer id = ref.fieldId();
+      if (upperBounds != null && upperBounds.containsKey(id)) {
+        return Conversions.fromByteBuffer(ref.ref().type(), 
upperBounds.get(id));
+      }
+
+      return null;
+    }
+
+    private <S, T> T transformLowerBound(BoundTransform<S, T> boundTransform) {
+      Transform<S, T> transform = boundTransform.transform();
+      if (transform.preservesOrder()) {
+        S lower = parseLowerBound(boundTransform.ref());
+        return 
boundTransform.transform().bind(boundTransform.ref().type()).apply(lower);
+      }
+
+      return null;
+    }
+
+    private <S, T> T transformUpperBound(BoundTransform<S, T> boundTransform) {
+      Transform<S, T> transform = boundTransform.transform();
+      if (transform.preservesOrder()) {
+        S upper = parseUpperBound(boundTransform.ref());
+        return 
boundTransform.transform().bind(boundTransform.ref().type()).apply(upper);
+      }
+
+      return null;
+    }
+
+    private <T> T extractLowerBound(BoundExtract<T> bound) {
+      Integer id = bound.ref().fieldId();
+      if (lowerBounds != null && lowerBounds.containsKey(id)) {
+        VariantObject fieldLowerBounds = parseBounds(lowerBounds.get(id));
+        return VariantExpressionUtil.castTo(
+            fieldLowerBounds.get(bound.fullFieldName()), bound.type());
+      }
+
+      return null;
+    }
+
+    private <T> T extractUpperBound(BoundExtract<T> bound) {
+      Integer id = bound.ref().fieldId();
+      if (upperBounds != null && upperBounds.containsKey(id)) {
+        VariantObject fieldUpperBounds = parseBounds(upperBounds.get(id));
+        return VariantExpressionUtil.castTo(
+            fieldUpperBounds.get(bound.fullFieldName()), bound.type());
+      }
+
+      return null;
+    }
+
+    /** Returns true if the expression term produces a null value for a null 
input. */
+    //    private boolean isNullPreserving(Bound<?> term) {
+    //      if (term instanceof BoundReference) {
+    //        return true;
+    //      } else if (term instanceof BoundTransform<?, ?>) {
+    //        // transforms must map null to null
+    //        return true;
+    //      } else if (term instanceof BoundExtract) {
+    //        // a null variant contains no non-null values
+    //        return true;
+    //      }
+    //
+    //      // unknown cases are not null preserving
+    //      return false;
+    //    }
+
+    /** Returns true if the expression term produces a non-null value for 
non-null input. */
+    private boolean isNonNullPreserving(Bound<?> term) {
+      if (term instanceof BoundReference) {
+        return true;
+      } else if (term instanceof BoundTransform<?, ?>) {
+        // if the transform preserves order, then non-null values are mapped 
to non-null
+        return ((BoundTransform<?, ?>) term).transform().preservesOrder();
+      }
+
+      // a non-null variant does not necessarily contain a specific field
+      // and unknown bound terms are not non-null preserving
+      return false;
+    }
+  }
+
+  private static VariantObject parseBounds(ByteBuffer buffer) {
+    return Variant.from(buffer).value().asObject();
   }
 }
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java 
b/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java
new file mode 100644
index 0000000000..dca11d5e46
--- /dev/null
+++ 
b/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.expressions;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.PhysicalType;
+import org.apache.iceberg.variants.VariantValue;
+
+class VariantExpressionUtil {
+  // TODO: Implement PhysicalType.TIME
+  // TODO: Implement PhysicalType.TIMESTAMPNTZ_NANO and 
PhysicalType.TIMESTAMPTZ_NANO
+  // TODO: Implement PhysicalType.UUID
+  private static final Map<Type, PhysicalType> NO_CONVERSION_NEEDED =
+      ImmutableMap.<Type, PhysicalType>builder()
+          .put(Types.IntegerType.get(), PhysicalType.INT32)
+          .put(Types.LongType.get(), PhysicalType.INT64)
+          .put(Types.FloatType.get(), PhysicalType.FLOAT)
+          .put(Types.DoubleType.get(), PhysicalType.DOUBLE)
+          .put(Types.DateType.get(), PhysicalType.DATE)
+          .put(Types.TimestampType.withoutZone(), PhysicalType.TIMESTAMPNTZ)
+          .put(Types.TimestampType.withZone(), PhysicalType.TIMESTAMPTZ)
+          .put(Types.StringType.get(), PhysicalType.STRING)
+          .put(Types.BinaryType.get(), PhysicalType.BINARY)
+          .put(Types.UnknownType.get(), PhysicalType.NULL)
+          .build();
+
+  private VariantExpressionUtil() {}
+
+  @SuppressWarnings("unchecked")
+  static <T> T castTo(VariantValue value, Type type) {
+    if (value == null) {
+      return null;
+    } else if (NO_CONVERSION_NEEDED.get(type) == value.type()) {
+      return (T) value.asPrimitive().get();
+    }
+
+    switch (type.typeId()) {
+      case INTEGER:
+        switch (value.type()) {
+          case INT8:
+          case INT16:
+            return (T) (Integer) ((Number) 
value.asPrimitive().get()).intValue();
+        }
+
+        break;
+      case LONG:
+        switch (value.type()) {
+          case INT8:
+          case INT16:
+          case INT32:
+            return (T) (Long) ((Number) value.asPrimitive().get()).longValue();
+        }
+
+        break;
+      case DOUBLE:
+        if (value.type() == PhysicalType.FLOAT) {
+          return (T) (Double) ((Number) 
value.asPrimitive().get()).doubleValue();
+        }
+
+        break;
+      case FIXED:
+        Types.FixedType fixedType = (Types.FixedType) type;
+        if (value.type() == PhysicalType.BINARY) {
+          ByteBuffer buffer = (ByteBuffer) value.asPrimitive().get();
+          if (buffer.remaining() == fixedType.length()) {
+            return (T) buffer;
+          }
+        }
+
+        break;
+      case DECIMAL:
+        Types.DecimalType decimalType = (Types.DecimalType) type;
+        switch (value.type()) {
+          case DECIMAL4:
+          case DECIMAL8:
+          case DECIMAL16:
+            BigDecimal decimalValue = (BigDecimal) value.asPrimitive().get();
+            if (decimalValue.scale() == decimalType.scale()) {
+              return (T) decimalValue;
+            }
+        }
+
+        break;
+      case BOOLEAN:
+        switch (value.type()) {
+          case BOOLEAN_FALSE:
+            return (T) Boolean.FALSE;
+          case BOOLEAN_TRUE:
+            return (T) Boolean.TRUE;
+        }
+
+        break;
+    }
+
+    return null;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java
 
b/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java
new file mode 100644
index 0000000000..76a1296a6c
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithExtract.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.extract;
+import static org.apache.iceberg.expressions.Expressions.greaterThan;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNaN;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notEqual;
+import static org.apache.iceberg.expressions.Expressions.notIn;
+import static org.apache.iceberg.expressions.Expressions.notNaN;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.notStartsWith;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.startsWith;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.TestHelpers.TestDataFile;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.Variants;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public class TestInclusiveMetricsEvaluatorWithExtract {
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", IntegerType.get()),
+          required(2, "variant", Types.VariantType.get()),
+          optional(3, "all_nulls", Types.VariantType.get()));
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 79;
+
+  private static final DataFile FILE =
+      new TestDataFile(
+          "file.avro",
+          Row.of(),
+          50,
+          // any value counts, including nulls
+          ImmutableMap.<Integer, Long>builder().put(1, 50L).put(2, 50L).put(3, 
50L).buildOrThrow(),
+          // null value counts
+          ImmutableMap.<Integer, Long>builder()
+              .put(1, 0L)
+              .put(2, 0L)
+              .put(3, 50L) // all_nulls
+              .buildOrThrow(),
+          // nan value counts
+          null,
+          // lower bounds
+          ImmutableMap.of(
+              2,
+              VariantTestUtil.variantBuffer(
+                  Map.of("event_id", Variants.of(INT_MIN_VALUE), "str", 
Variants.of("abc")))),
+          // upper bounds
+          ImmutableMap.of(
+              2,
+              VariantTestUtil.variantBuffer(
+                  Map.of("event_id", Variants.of(INT_MAX_VALUE), "str", 
Variants.of("abe")))));
+
+  private boolean shouldRead(Expression expr) {
+    return shouldRead(expr, FILE);
+  }
+
+  private boolean shouldRead(Expression expr, DataFile file) {
+    return shouldRead(expr, file, true);
+  }
+
+  private boolean shouldReadCaseInsensitive(Expression expr) {
+    return shouldRead(expr, FILE, false);
+  }
+
+  private boolean shouldRead(Expression expr, DataFile file, boolean 
caseSensitive) {
+    return new InclusiveMetricsEvaluator(SCHEMA, expr, 
caseSensitive).eval(file);
+  }
+
+  @Test
+  public void testAllNulls() {
+    assertThat(shouldRead(isNull(extract("all_nulls", "$.event_id", "long"))))
+        .as("Should read: null values exist in all null column")
+        .isTrue();
+
+    assertThat(shouldRead(notNull(extract("all_nulls", "$.event_id", "long"))))
+        .as("Should skip: no non-null value in all null column")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(extract("all_nulls", "$.event_id", "long"), 
30)))
+        .as("Should skip: lessThan on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(lessThanOrEqual(extract("all_nulls", "$.event_id", 
"long"), 30)))
+        .as("Should skip: lessThanOrEqual on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(extract("all_nulls", "$.event_id", 
"long"), 30)))
+        .as("Should skip: greaterThan on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThanOrEqual(extract("all_nulls", 
"$.event_id", "long"), 30)))
+        .as("Should skip: greaterThanOrEqual on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(equal(extract("all_nulls", "$.event_id", "long"), 
30)))
+        .as("Should skip: equal on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(notEqual(extract("all_nulls", "$.event_id", "long"), 
30)))
+        .as("Should read: notEqual on all null column")
+        .isTrue();
+
+    assertThat(shouldRead(in(extract("all_nulls", "$.event_type", "string"), 
"abc", "def")))
+        .as("Should skip: in on all nulls column")
+        .isFalse();
+
+    assertThat(shouldRead(notIn(extract("all_nulls", "$.event_type", 
"string"), "abc", "def")))
+        .as("Should read: notIn on all nulls column")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(extract("all_nulls", "$.event_type", 
"string"), "a")))
+        .as("Should skip: startsWith on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(notStartsWith(extract("all_nulls", "$.event_type", 
"string"), "a")))
+        .as("Should read: notStartsWith on all null column")
+        .isTrue();
+
+    assertThat(shouldRead(isNaN(extract("all_nulls", "$.measurement", 
"double"))))
+        .as("Should skip: no NaN value in all null column")
+        .isFalse();
+
+    assertThat(shouldRead(notNaN(extract("all_nulls", "$.measurement", 
"double"))))
+        .as("Should read: all null column has non-NaN values")
+        .isTrue();
+  }
+
+  @Test
+  public void testIsNaNAndNotNaN() {
+    assertThat(shouldRead(isNaN(extract("variant", "$.measurement", 
"double"))))
+        .as("Should read: variant may contain NaN")
+        .isTrue();
+
+    assertThat(shouldRead(notNaN(extract("variant", "$.measurement", 
"double"))))
+        .as("Should read: variant may contain non-NaN")
+        .isTrue();
+  }
+
+  @Test
+  public void testMissingColumn() {
+    assertThatThrownBy(() -> shouldRead(lessThan(extract("missing", "$", 
"int"), 5)))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot find field 'missing'");
+  }
+
+  private static final Expression[] MISSING_STATS_EXPRESSIONS =
+      new Expression[] {
+        lessThan(extract("variant", "$.missing", "long"), 5),
+        lessThanOrEqual(extract("variant", "$.missing", "long"), 30),
+        equal(extract("variant", "$.missing", "long"), 70),
+        greaterThan(extract("variant", "$.missing", "long"), 78),
+        greaterThanOrEqual(extract("variant", "$.missing", "long"), 90),
+        notEqual(extract("variant", "$.missing", "long"), 101),
+        isNull(extract("variant", "$.missing", "string")),
+        notNull(extract("variant", "$.missing", "string")),
+        isNaN(extract("variant", "$.missing", "float")),
+        notNaN(extract("variant", "$.missing", "float"))
+      };
+
+  @ParameterizedTest
+  @FieldSource("MISSING_STATS_EXPRESSIONS")
+  public void testMissingStats(Expression expr) {
+    assertThat(shouldRead(expr)).as("Should read when missing stats for expr: 
" + expr).isTrue();
+  }
+
+  @ParameterizedTest
+  @FieldSource("MISSING_STATS_EXPRESSIONS")
+  public void testZeroRecordFile(Expression expr) {
+    DataFile empty = new TestDataFile("file.parquet", Row.of(), 0);
+    assertThat(shouldRead(expr, empty)).as("Should never read 0-record file: " 
+ expr).isFalse();
+  }
+
+  @Test
+  public void testNot() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding 
will simplify it out
+    assertThat(
+            shouldRead(not(lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25))))
+        .as("Should read: not(false)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                not(greaterThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25))))
+        .as("Should skip: not(true)")
+        .isFalse();
+  }
+
+  @Test
+  public void testAnd() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding 
will simplify it out
+    assertThat(
+            shouldRead(
+                and(
+                    lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(
+                        extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 30))))
+        .as("Should skip: and(false, true)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                and(
+                    lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(
+                        extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 1))))
+        .as("Should skip: and(false, false)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                and(
+                    greaterThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25),
+                    lessThanOrEqual(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE))))
+        .as("Should read: and(true, true)")
+        .isTrue();
+  }
+
+  @Test
+  public void testOr() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding 
will simplify it out
+    assertThat(
+            shouldRead(
+                or(
+                    lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(
+                        extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 1))))
+        .as("Should skip: or(false, false)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                or(
+                    lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(
+                        extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE - 19))))
+        .as("Should read: or(false, true)")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerLt() {
+    assertThat(shouldRead(lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25)))
+        .as("Should not read: id range below lower bound (5 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE)))
+        .as("Should not read: id range below lower bound (30 is not < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE + 1)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(lessThan(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE)))
+        .as("Should read: may possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerLtEq() {
+    assertThat(
+            shouldRead(
+                lessThanOrEqual(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25)))
+        .as("Should not read: id range below lower bound (5 < 30)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                lessThanOrEqual(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 1)))
+        .as("Should not read: id range below lower bound (29 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThanOrEqual(extract("variant", "$.event_id", 
"long"), INT_MIN_VALUE)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(lessThanOrEqual(extract("variant", "$.event_id", 
"long"), INT_MAX_VALUE)))
+        .as("Should read: many possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerGt() {
+    assertThat(shouldRead(greaterThan(extract("variant", "$.event_id", 
"long"), INT_MAX_VALUE + 6)))
+        .as("Should not read: id range above upper bound (85 < 79)")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(extract("variant", "$.event_id", 
"long"), INT_MAX_VALUE)))
+        .as("Should not read: id range above upper bound (79 is not > 79)")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(extract("variant", "$.event_id", 
"long"), INT_MAX_VALUE - 1)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(greaterThan(extract("variant", "$.event_id", 
"long"), INT_MAX_VALUE - 4)))
+        .as("Should read: may possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerGtEq() {
+    assertThat(
+            shouldRead(
+                greaterThanOrEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 6)))
+        .as("Should not read: id range above upper bound (85 < 79)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                greaterThanOrEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 1)))
+        .as("Should not read: id range above upper bound (80 > 79)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(greaterThanOrEqual(extract("variant", "$.event_id", 
"long"), INT_MAX_VALUE)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                greaterThanOrEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE - 4)))
+        .as("Should read: may possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerEq() {
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25)))
+        .as("Should not read: id below lower bound")
+        .isFalse();
+
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 1)))
+        .as("Should not read: id below lower bound")
+        .isFalse();
+
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE - 4)))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE)))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 1)))
+        .as("Should not read: id above upper bound")
+        .isFalse();
+
+    assertThat(shouldRead(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 6)))
+        .as("Should not read: id above upper bound")
+        .isFalse();
+  }
+
+  @Test
+  public void testIntegerNotEq() {
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25)))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 1)))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE - 4)))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE)))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 1)))
+        .as("Should read: id above upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 6)))
+        .as("Should read: id above upper bound")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerNotEqRewritten() {
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 25))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE - 1))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MIN_VALUE))))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE - 4))))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE))))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 1))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(extract("variant", "$.event_id", "long"), 
INT_MAX_VALUE + 6))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+  }
+
+  @Test
+  public void testCaseInsensitiveIntegerNotEqRewritten() {
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MIN_VALUE - 25))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MIN_VALUE - 1))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MIN_VALUE))))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MAX_VALUE - 4))))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MAX_VALUE))))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MAX_VALUE + 1))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+
+    assertThat(
+            shouldReadCaseInsensitive(
+                not(equal(extract("VARIANT", "$.event_id", "long"), 
INT_MAX_VALUE + 6))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+  }
+
+  @Test
+  public void testCaseSensitiveIntegerNotEqRewritten() {
+    assertThatThrownBy(() -> shouldRead(not(equal(extract("VARIANT", 
"$.event_id", "long"), 5))))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot find field 'VARIANT'");
+  }
+
+  @Test
+  public void testStringStartsWith() {
+    assertThat(shouldRead(startsWith(extract("variant", "$.str", "string"), 
"a")))
+        .as("Should read: prefix of lower and upper")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(extract("variant", "$.str", "string"), 
"ab")))
+        .as("Should read: prefix of lower and upper")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(extract("variant", "$.str", "string"), 
"abcd")))
+        .as("Should read: lower is prefix of value")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(extract("variant", "$.str", "string"), 
"abd")))
+        .as("Should read: value is between upper and lower")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(extract("variant", "$.str", "string"), 
"abex")))
+        .as("Should skip: upper is prefix of value")
+        .isFalse();
+  }
+
+  @Test
+  public void testStringNotStartsWith() {
+    assertThat(shouldRead(notStartsWith(extract("variant", "$.str", "string"), 
"a")))
+        .as("Should skip: prefix of lower and upper, all values must match")
+        .isFalse();
+
+    assertThat(shouldRead(notStartsWith(extract("variant", "$.str", "string"), 
"ab")))
+        .as("Should skip: prefix of lower and upper, all values must match")
+        .isFalse();
+
+    assertThat(shouldRead(notStartsWith(extract("variant", "$.str", "string"), 
"abcd")))
+        .as("Should skip: lower is prefix of value, some values do not match")
+        .isTrue();
+
+    assertThat(shouldRead(notStartsWith(extract("variant", "$.str", "string"), 
"abd")))
+        .as("Should read: value is between upper and lower, some values do not 
match")
+        .isTrue();
+
+    assertThat(shouldRead(notStartsWith(extract("variant", "$.str", "string"), 
"abex")))
+        .as("Should read: upper is prefix of value, some values do not match")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerIn() {
+    assertThat(
+            shouldRead(
+                in(
+                    extract("variant", "$.event_id", "long"),
+                    INT_MIN_VALUE - 25,
+                    INT_MIN_VALUE - 24)))
+        .as("Should not read: id below lower bound (5 < 30, 6 < 30)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                in(extract("variant", "$.event_id", "long"), INT_MIN_VALUE - 
2, INT_MIN_VALUE - 1)))
+        .as("Should not read: id below lower bound (28 < 30, 29 < 30)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                in(extract("variant", "$.event_id", "long"), INT_MIN_VALUE - 
1, INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound (30 == 30)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                in(extract("variant", "$.event_id", "long"), INT_MAX_VALUE - 
4, INT_MAX_VALUE - 3)))
+        .as("Should read: id between lower and upper bounds (30 < 75 < 79, 30 
< 76 < 79)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                in(extract("variant", "$.event_id", "long"), INT_MAX_VALUE, 
INT_MAX_VALUE + 1)))
+        .as("Should read: id equal to upper bound (79 == 79)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                in(extract("variant", "$.event_id", "long"), INT_MAX_VALUE + 
1, INT_MAX_VALUE + 2)))
+        .as("Should not read: id above upper bound (80 > 79, 81 > 79)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                in(extract("variant", "$.event_id", "long"), INT_MAX_VALUE + 
6, INT_MAX_VALUE + 7)))
+        .as("Should not read: id above upper bound (85 > 79, 86 > 79)")
+        .isFalse();
+
+    // should read as the number of elements in the in expression is too big
+    List<Integer> ids = Lists.newArrayListWithExpectedSize(400);
+    for (int id = -400; id <= 0; id++) {
+      ids.add(id);
+    }
+    assertThat(shouldRead(in(extract("variant", "$.event_id", "long"), ids)))
+        .as("Should read: large in expression")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerNotIn() {
+    assertThat(
+            shouldRead(
+                notIn(
+                    extract("variant", "$.event_id", "long"),
+                    INT_MIN_VALUE - 25,
+                    INT_MIN_VALUE - 24)))
+        .as("Should read: id below lower bound (5 < 30, 6 < 30)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                notIn(
+                    extract("variant", "$.event_id", "long"),
+                    INT_MIN_VALUE - 2,
+                    INT_MIN_VALUE - 1)))
+        .as("Should read: id below lower bound (28 < 30, 29 < 30)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                notIn(extract("variant", "$.event_id", "long"), INT_MIN_VALUE 
- 1, INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound (30 == 30)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                notIn(
+                    extract("variant", "$.event_id", "long"),
+                    INT_MAX_VALUE - 4,
+                    INT_MAX_VALUE - 3)))
+        .as("Should read: id between lower and upper bounds (30 < 75 < 79, 30 
< 76 < 79)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                notIn(extract("variant", "$.event_id", "long"), INT_MAX_VALUE, 
INT_MAX_VALUE + 1)))
+        .as("Should read: id equal to upper bound (79 == 79)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                notIn(
+                    extract("variant", "$.event_id", "long"),
+                    INT_MAX_VALUE + 1,
+                    INT_MAX_VALUE + 2)))
+        .as("Should read: id above upper bound (80 > 79, 81 > 79)")
+        .isTrue();
+
+    assertThat(
+            shouldRead(
+                notIn(
+                    extract("variant", "$.event_id", "long"),
+                    INT_MAX_VALUE + 6,
+                    INT_MAX_VALUE + 7)))
+        .as("Should read: id above upper bound (85 > 79, 86 > 79)")
+        .isTrue();
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithTransforms.java
 
b/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithTransforms.java
new file mode 100644
index 0000000000..5632830064
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluatorWithTransforms.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.bucket;
+import static org.apache.iceberg.expressions.Expressions.day;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThan;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notEqual;
+import static org.apache.iceberg.expressions.Expressions.notIn;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.notStartsWith;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.startsWith;
+import static org.apache.iceberg.expressions.Expressions.truncate;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.ZoneOffset;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.TestHelpers.TestDataFile;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public class TestInclusiveMetricsEvaluatorWithTransforms {
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", IntegerType.get()),
+          required(2, "ts", Types.TimestampType.withZone()),
+          optional(3, "all_nulls", Types.IntegerType.get()),
+          optional(4, "all_nulls_str", Types.StringType.get()),
+          optional(5, "no_stats", IntegerType.get()),
+          optional(6, "str", Types.StringType.get()));
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 79;
+
+  private static final long TS_MIN_VALUE =
+      DateTimeUtil.microsFromTimestamptz(
+          
DateTimeUtil.dateFromDays(30).atStartOfDay().atOffset(ZoneOffset.UTC));
+  private static final long TS_MAX_VALUE =
+      DateTimeUtil.microsFromTimestamptz(
+          
DateTimeUtil.dateFromDays(79).atStartOfDay().atOffset(ZoneOffset.UTC));
+
+  private static final DataFile FILE =
+      new TestDataFile(
+          "file.avro",
+          Row.of(),
+          50,
+          // any value counts, including nulls
+          ImmutableMap.<Integer, Long>builder()
+              .put(1, 50L)
+              .put(2, 50L)
+              .put(3, 50L)
+              .put(4, 50L)
+              .buildOrThrow(),
+          // null value counts
+          ImmutableMap.<Integer, Long>builder()
+              .put(1, 0L)
+              .put(2, 0L)
+              .put(3, 50L) // all_nulls
+              .put(4, 50L) // all_nulls_str
+              .buildOrThrow(),
+          // nan value counts
+          null,
+          // lower bounds
+          ImmutableMap.of(
+              2, Conversions.toByteBuffer(Types.TimestampType.withZone(), 
TS_MIN_VALUE),
+              6, Conversions.toByteBuffer(Types.StringType.get(), "abc")),
+          // upper bounds
+          ImmutableMap.of(
+              2, Conversions.toByteBuffer(Types.TimestampType.withZone(), 
TS_MAX_VALUE),
+              6, Conversions.toByteBuffer(Types.StringType.get(), "abe")));
+
+  private boolean shouldRead(Expression expr) {
+    return shouldRead(expr, FILE);
+  }
+
+  private boolean shouldRead(Expression expr, DataFile file) {
+    return shouldRead(expr, file, true);
+  }
+
+  private boolean shouldReadCaseInsensitive(Expression expr) {
+    return shouldRead(expr, FILE, false);
+  }
+
+  private boolean shouldRead(Expression expr, DataFile file, boolean 
caseSensitive) {
+    return new InclusiveMetricsEvaluator(SCHEMA, expr, 
caseSensitive).eval(file);
+  }
+
+  @Test
+  public void testAllNullsWithNonOrderPreserving() {
+    assertThat(shouldRead(isNull(bucket("all_nulls", 100))))
+        .as("Should read: null values exist in all null column")
+        .isTrue();
+
+    assertThat(shouldRead(notNull(bucket("all_nulls", 100))))
+        .as("Should skip: no non-null value in all null column")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(bucket("all_nulls", 100), 30)))
+        .as("Should skip: lessThan on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(lessThanOrEqual(bucket("all_nulls", 100), 30)))
+        .as("Should skip: lessThanOrEqual on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(bucket("all_nulls", 100), 30)))
+        .as("Should skip: greaterThan on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThanOrEqual(bucket("all_nulls", 100), 30)))
+        .as("Should skip: greaterThanOrEqual on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(equal(bucket("all_nulls", 100), 30)))
+        .as("Should skip: equal on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(notEqual(bucket("all_nulls", 100), 30)))
+        .as("Should read: notEqual on all null column")
+        .isTrue();
+
+    assertThat(shouldRead(in(bucket("all_nulls", 100), 1, 2)))
+        .as("Should read: in on all nulls column")
+        .isFalse();
+
+    assertThat(shouldRead(notIn(bucket("all_nulls", 100), 1, 2)))
+        .as("Should read: notIn on all nulls column")
+        .isTrue();
+  }
+
+  @Test
+  public void testRequiredWithNonOrderPreserving() {
+    assertThat(shouldRead(isNull(bucket("ts", 100))))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(notNull(bucket("ts", 100))))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(lessThan(bucket("ts", 100), 30)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(lessThanOrEqual(bucket("ts", 100), 30)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(greaterThan(bucket("ts", 100), 30)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(greaterThanOrEqual(bucket("ts", 100), 30)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(equal(bucket("ts", 100), 30)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(bucket("ts", 100), 30)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(in(bucket("ts", 100), 1, 2)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(bucket("ts", 100), 1, 2)))
+        .as("Should read: non-order-preserving transform")
+        .isTrue();
+  }
+
+  @Test
+  public void testAllNulls() {
+    assertThat(shouldRead(isNull(truncate("all_nulls", 10))))
+        .as("Should read: null values exist in all null column")
+        .isTrue();
+
+    assertThat(shouldRead(notNull(truncate("all_nulls", 10))))
+        .as("Should skip: no non-null value in all null column")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(truncate("all_nulls", 10), 30)))
+        .as("Should skip: lessThan on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(lessThanOrEqual(truncate("all_nulls", 10), 30)))
+        .as("Should skip: lessThanOrEqual on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(truncate("all_nulls", 10), 30)))
+        .as("Should skip: greaterThan on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThanOrEqual(truncate("all_nulls", 10), 30)))
+        .as("Should skip: greaterThanOrEqual on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(equal(truncate("all_nulls", 10), 30)))
+        .as("Should skip: equal on all null column")
+        .isFalse();
+
+    assertThat(shouldRead(notEqual(truncate("all_nulls", 10), 30)))
+        .as("Should read: notEqual on all null column")
+        .isTrue();
+
+    assertThat(shouldRead(in(truncate("all_nulls", 10), 10, 20)))
+        .as("Should skip: in on all nulls column")
+        .isFalse();
+
+    assertThat(shouldRead(notIn(truncate("all_nulls", 10), 10, 20)))
+        .as("Should read: notIn on all nulls column")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(truncate("all_nulls_str", 10), "a")))
+        .as("Should skip: startsWith on all null column")
+        .isTrue(); // depends on rewriting the expression, not evaluation
+
+    assertThat(shouldRead(notStartsWith(truncate("all_nulls_str", 10), "a")))
+        .as("Should read: notStartsWith on all null column")
+        .isTrue();
+  }
+
+  @Test
+  public void testMissingColumn() {
+    assertThatThrownBy(() -> shouldRead(lessThan(truncate("missing", 10), 20)))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot find field 'missing'");
+  }
+
+  private static final Expression[] MISSING_STATS_EXPRESSIONS =
+      new Expression[] {
+        lessThan(truncate("no_stats", 10), 5),
+        lessThanOrEqual(truncate("no_stats", 10), 30),
+        equal(truncate("no_stats", 10), 70),
+        greaterThan(truncate("no_stats", 10), 78),
+        greaterThanOrEqual(truncate("no_stats", 10), 90),
+        notEqual(truncate("no_stats", 10), 101),
+        isNull(truncate("no_stats", 10)),
+        notNull(truncate("no_stats", 10))
+      };
+
+  @ParameterizedTest
+  @FieldSource("MISSING_STATS_EXPRESSIONS")
+  public void testMissingStats(Expression expr) {
+    assertThat(shouldRead(expr)).as("Should read when missing stats for expr: 
" + expr).isTrue();
+  }
+
+  @ParameterizedTest
+  @FieldSource("MISSING_STATS_EXPRESSIONS")
+  public void testZeroRecordFile(Expression expr) {
+    DataFile empty = new TestDataFile("file.parquet", Row.of(), 0);
+    assertThat(shouldRead(expr, empty)).as("Should never read 0-record file: " 
+ expr).isFalse();
+  }
+
+  @Test
+  public void testNot() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding 
will simplify it out
+    assertThat(shouldRead(not(lessThan(day("ts"), INT_MIN_VALUE - 25))))
+        .as("Should read: not(false)")
+        .isTrue();
+
+    assertThat(shouldRead(not(greaterThan(day("ts"), INT_MIN_VALUE - 25))))
+        .as("Should skip: not(true)")
+        .isFalse();
+  }
+
+  @Test
+  public void testAnd() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding 
will simplify it out
+    assertThat(
+            shouldRead(
+                and(
+                    lessThan(day("ts"), INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(day("ts"), INT_MIN_VALUE - 30))))
+        .as("Should skip: and(false, true)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                and(
+                    lessThan(day("ts"), INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(day("ts"), INT_MAX_VALUE + 1))))
+        .as("Should skip: and(false, false)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                and(
+                    greaterThan(day("ts"), INT_MIN_VALUE - 25),
+                    lessThanOrEqual(day("ts"), INT_MIN_VALUE))))
+        .as("Should read: and(true, true)")
+        .isTrue();
+  }
+
+  @Test
+  public void testOr() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding 
will simplify it out
+    assertThat(
+            shouldRead(
+                or(
+                    lessThan(day("ts"), INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(day("ts"), INT_MAX_VALUE + 1))))
+        .as("Should skip: or(false, false)")
+        .isFalse();
+
+    assertThat(
+            shouldRead(
+                or(
+                    lessThan(day("ts"), INT_MIN_VALUE - 25),
+                    greaterThanOrEqual(day("ts"), INT_MAX_VALUE - 19))))
+        .as("Should read: or(false, true)")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerLt() {
+    assertThat(shouldRead(lessThan(day("ts"), INT_MIN_VALUE - 25)))
+        .as("Should not read: id range below lower bound (5 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(day("ts"), INT_MIN_VALUE)))
+        .as("Should not read: id range below lower bound (30 is not < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThan(day("ts"), INT_MIN_VALUE + 1)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(lessThan(day("ts"), INT_MAX_VALUE)))
+        .as("Should read: may possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerLtEq() {
+    assertThat(shouldRead(lessThanOrEqual(day("ts"), INT_MIN_VALUE - 25)))
+        .as("Should not read: id range below lower bound (5 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThanOrEqual(day("ts"), INT_MIN_VALUE - 1)))
+        .as("Should not read: id range below lower bound (29 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(lessThanOrEqual(day("ts"), INT_MIN_VALUE)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(lessThanOrEqual(day("ts"), INT_MAX_VALUE)))
+        .as("Should read: many possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerGt() {
+    assertThat(shouldRead(greaterThan(day("ts"), INT_MAX_VALUE + 6)))
+        .as("Should not read: id range above upper bound (85 < 79)")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(day("ts"), INT_MAX_VALUE)))
+        .as("Should not read: id range above upper bound (79 is not > 79)")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThan(day("ts"), INT_MAX_VALUE - 1)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(greaterThan(day("ts"), INT_MAX_VALUE - 4)))
+        .as("Should read: may possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerGtEq() {
+    assertThat(shouldRead(greaterThanOrEqual(day("ts"), INT_MAX_VALUE + 6)))
+        .as("Should not read: id range above upper bound (85 < 79)")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThanOrEqual(day("ts"), INT_MAX_VALUE + 1)))
+        .as("Should not read: id range above upper bound (80 > 79)")
+        .isFalse();
+
+    assertThat(shouldRead(greaterThanOrEqual(day("ts"), INT_MAX_VALUE)))
+        .as("Should read: one possible id")
+        .isTrue();
+
+    assertThat(shouldRead(greaterThanOrEqual(day("ts"), INT_MAX_VALUE - 4)))
+        .as("Should read: may possible ids")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerEq() {
+    assertThat(shouldRead(equal(day("ts"), INT_MIN_VALUE - 25)))
+        .as("Should not read: id below lower bound")
+        .isFalse();
+
+    assertThat(shouldRead(equal(day("ts"), INT_MIN_VALUE - 1)))
+        .as("Should not read: id below lower bound")
+        .isFalse();
+
+    assertThat(shouldRead(equal(day("ts"), INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(equal(day("ts"), INT_MAX_VALUE - 4)))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldRead(equal(day("ts"), INT_MAX_VALUE)))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(equal(day("ts"), INT_MAX_VALUE + 1)))
+        .as("Should not read: id above upper bound")
+        .isFalse();
+
+    assertThat(shouldRead(equal(day("ts"), INT_MAX_VALUE + 6)))
+        .as("Should not read: id above upper bound")
+        .isFalse();
+  }
+
+  @Test
+  public void testIntegerNotEq() {
+    assertThat(shouldRead(notEqual(day("ts"), INT_MIN_VALUE - 25)))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(day("ts"), INT_MIN_VALUE - 1)))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(day("ts"), INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(day("ts"), INT_MAX_VALUE - 4)))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(day("ts"), INT_MAX_VALUE)))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(day("ts"), INT_MAX_VALUE + 1)))
+        .as("Should read: id above upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(notEqual(day("ts"), INT_MAX_VALUE + 6)))
+        .as("Should read: id above upper bound")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerNotEqRewritten() {
+    assertThat(shouldRead(not(equal(day("ts"), INT_MIN_VALUE - 25))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(day("ts"), INT_MIN_VALUE - 1))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(day("ts"), INT_MIN_VALUE))))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(day("ts"), INT_MAX_VALUE - 4))))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(day("ts"), INT_MAX_VALUE))))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(day("ts"), INT_MAX_VALUE + 1))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+
+    assertThat(shouldRead(not(equal(day("ts"), INT_MAX_VALUE + 6))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+  }
+
+  @Test
+  public void testCaseInsensitiveIntegerNotEqRewritten() {
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MIN_VALUE - 
25))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MIN_VALUE - 
1))))
+        .as("Should read: id below lower bound")
+        .isTrue();
+
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MIN_VALUE))))
+        .as("Should read: id equal to lower bound")
+        .isTrue();
+
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MAX_VALUE - 
4))))
+        .as("Should read: id between lower and upper bounds")
+        .isTrue();
+
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MAX_VALUE))))
+        .as("Should read: id equal to upper bound")
+        .isTrue();
+
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MAX_VALUE + 
1))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+
+    assertThat(shouldReadCaseInsensitive(not(equal(day("TS"), INT_MAX_VALUE + 
6))))
+        .as("Should read: id above upper bound")
+        .isTrue();
+  }
+
+  @Test
+  public void testCaseSensitiveIntegerNotEqRewritten() {
+    assertThatThrownBy(() -> shouldRead(not(equal(day("TS"), 5))))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot find field 'TS'");
+  }
+
+  @Test
+  public void testStringStartsWith() {
+    assertThat(shouldRead(startsWith(truncate("str", 10), "a")))
+        .as("Should read: not rewritten")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(truncate("str", 10), "ab")))
+        .as("Should read: not rewritten")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(truncate("str", 10), "b")))
+        .as("Should read: not rewritten")
+        .isTrue();
+  }
+
+  @Test
+  public void testStringNotStartsWith() {
+    assertThat(shouldRead(startsWith(truncate("str", 10), "a")))
+        .as("Should read: not rewritten")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(truncate("str", 10), "ab")))
+        .as("Should read: not rewritten")
+        .isTrue();
+
+    assertThat(shouldRead(startsWith(truncate("str", 10), "b")))
+        .as("Should read: not rewritten")
+        .isTrue();
+  }
+
+  @Test
+  public void testIntegerIn() {
+    assertThat(shouldRead(in(day("ts"), INT_MIN_VALUE - 25, INT_MIN_VALUE - 
24)))
+        .as("Should not read: id below lower bound (5 < 30, 6 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(in(day("ts"), INT_MIN_VALUE - 2, INT_MIN_VALUE - 1)))
+        .as("Should not read: id below lower bound (28 < 30, 29 < 30)")
+        .isFalse();
+
+    assertThat(shouldRead(in(day("ts"), INT_MIN_VALUE - 1, INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound (30 == 30)")
+        .isTrue();
+
+    assertThat(shouldRead(in(day("ts"), INT_MAX_VALUE - 4, INT_MAX_VALUE - 3)))
+        .as("Should read: id between lower and upper bounds (30 < 75 < 79, 30 
< 76 < 79)")
+        .isTrue();
+
+    assertThat(shouldRead(in(day("ts"), INT_MAX_VALUE, INT_MAX_VALUE + 1)))
+        .as("Should read: id equal to upper bound (79 == 79)")
+        .isTrue();
+
+    assertThat(shouldRead(in(day("ts"), INT_MAX_VALUE + 1, INT_MAX_VALUE + 2)))
+        .as("Should not read: id above upper bound (80 > 79, 81 > 79)")
+        .isFalse();
+
+    assertThat(shouldRead(in(day("ts"), INT_MAX_VALUE + 6, INT_MAX_VALUE + 7)))
+        .as("Should not read: id above upper bound (85 > 79, 86 > 79)")
+        .isFalse();
+
+    // should read as the number of elements in the in expression is too big
+    List<Integer> ids = Lists.newArrayListWithExpectedSize(400);
+    for (int id = -400; id <= 0; id++) {
+      ids.add(id);
+    }
+    assertThat(shouldRead(in(day("ts"), ids))).as("Should read: large in 
expression").isTrue();
+  }
+
+  @Test
+  public void testIntegerNotIn() {
+    assertThat(shouldRead(notIn(day("ts"), INT_MIN_VALUE - 25, INT_MIN_VALUE - 
24)))
+        .as("Should read: id below lower bound (5 < 30, 6 < 30)")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(day("ts"), INT_MIN_VALUE - 2, INT_MIN_VALUE - 
1)))
+        .as("Should read: id below lower bound (28 < 30, 29 < 30)")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(day("ts"), INT_MIN_VALUE - 1, INT_MIN_VALUE)))
+        .as("Should read: id equal to lower bound (30 == 30)")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(day("ts"), INT_MAX_VALUE - 4, INT_MAX_VALUE - 
3)))
+        .as("Should read: id between lower and upper bounds (30 < 75 < 79, 30 
< 76 < 79)")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(day("ts"), INT_MAX_VALUE, INT_MAX_VALUE + 1)))
+        .as("Should read: id equal to upper bound (79 == 79)")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(day("ts"), INT_MAX_VALUE + 1, INT_MAX_VALUE + 
2)))
+        .as("Should read: id above upper bound (80 > 79, 81 > 79)")
+        .isTrue();
+
+    assertThat(shouldRead(notIn(day("ts"), INT_MAX_VALUE + 6, INT_MAX_VALUE + 
7)))
+        .as("Should read: id above upper bound (85 > 79, 86 > 79)")
+        .isTrue();
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index a4f10d340e..d242661388 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -407,7 +407,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT Equal
     builder = scanBuilder();
@@ -416,6 +416,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
+    // notEq can't be answered using column bounds because they are not exact
     assertThat(scan.planInputPartitions().length).isEqualTo(10);
   }
 
@@ -464,7 +465,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT GT
     builder = scanBuilder();
@@ -473,7 +474,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
   }
 
   @Test
@@ -521,7 +522,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT LT
     builder = scanBuilder();
@@ -530,7 +531,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
   }
 
   @Test
@@ -577,7 +578,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(8);
 
     // NOT GTEQ
     builder = scanBuilder();
@@ -586,7 +587,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(2);
   }
 
   @Test
@@ -734,7 +735,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
   }
 
   @Test
@@ -773,7 +774,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(0);
 
     // NOT IsNull
     builder = scanBuilder();
@@ -830,7 +831,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(0);
   }
 
   @Test
@@ -875,7 +876,7 @@ public class TestSparkScan extends SparkTestBaseWithCatalog 
{
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT (years(ts) = 47 AND bucket(id, 5) >= 2)
     builder = scanBuilder();
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
index 334725ec8c..3450050e4f 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java
@@ -425,7 +425,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT Equal
     builder = scanBuilder();
@@ -434,6 +434,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
+    // notEq can't be answered using column bounds because they are not exact
     assertThat(scan.planInputPartitions().length).isEqualTo(10);
   }
 
@@ -482,7 +483,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT GT
     builder = scanBuilder();
@@ -491,7 +492,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
   }
 
   @TestTemplate
@@ -539,7 +540,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT LT
     builder = scanBuilder();
@@ -548,7 +549,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
   }
 
   @TestTemplate
@@ -595,7 +596,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(8);
 
     // NOT GTEQ
     builder = scanBuilder();
@@ -604,7 +605,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(2);
   }
 
   @TestTemplate
@@ -752,7 +753,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
   }
 
   @TestTemplate
@@ -791,7 +792,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(0);
 
     // NOT IsNull
     builder = scanBuilder();
@@ -848,7 +849,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(0);
   }
 
   @TestTemplate
@@ -893,7 +894,7 @@ public class TestSparkScan extends TestBaseWithCatalog {
     pushFilters(builder, predicate);
     Batch scan = builder.build().toBatch();
 
-    assertThat(scan.planInputPartitions().length).isEqualTo(10);
+    assertThat(scan.planInputPartitions().length).isEqualTo(5);
 
     // NOT (years(ts) = 47 AND bucket(id, 5) >= 2)
     builder = scanBuilder();


Reply via email to