This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c1d5ad17b4 [parquet] Support generate the RowRanges directly from the 
selection (#6060)
c1d5ad17b4 is described below

commit c1d5ad17b4abc5059f835d19b9ec4bc2bf7cd08a
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Mon Aug 18 19:19:18 2025 +0800

    [parquet] Support generate the RowRanges directly from the selection (#6060)
---
 .../paimon/table/AppendOnlySimpleTableTest.java    |   4 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   |  74 +++++-
 .../filter2/columnindex/ColumnIndexFilter.java     | 271 ---------------------
 .../internal/filter2/columnindex/RowRanges.java    |  17 +-
 4 files changed, 68 insertions(+), 298 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index babb62a995..c262f892a8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -826,7 +826,7 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
     }
 
     @Test
-    public void testRangeBitmapIndexTopNFilter() throws Exception {
+    public void testTopNResultFilterParquetRowRanges() throws Exception {
         RowType rowType =
                 RowType.builder()
                         .field("id", DataTypes.STRING())
@@ -851,7 +851,7 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
         // in unaware-bucket mode, we split files into splits all the time
         FileStoreTable table = createUnawareBucketFileStoreTable(rowType, 
configure);
 
-        int bound = 300000;
+        int bound = 30000000;
         int rowCount = 1000000;
         Random random = new Random();
         int k = random.nextInt(100) + 1;
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 05e9d12033..cdf723c524 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -98,6 +98,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -329,7 +330,8 @@ public class ParquetFileReader implements Closeable {
 
     public long getFilteredRecordCount() {
         if (!options.useColumnIndexFilter()
-                || 
!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+                || 
(!FilterCompat.isFilteringRequired(options.getRecordFilter())
+                        && selection == null)) {
             return getRecordCount();
         }
         long total = 0L;
@@ -514,7 +516,8 @@ public class ParquetFileReader implements Closeable {
 
         // Filtering not required -> fall back to the non-filtering path
         if (!options.useColumnIndexFilter()
-                || 
!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+                || 
(!FilterCompat.isFilteringRequired(options.getRecordFilter())
+                        && selection == null)) {
             return internalReadRowGroup(blockIndex);
         }
 
@@ -681,7 +684,8 @@ public class ParquetFileReader implements Closeable {
         }
         // Filtering not required -> fall back to the non-filtering path
         if (!options.useColumnIndexFilter()
-                || 
!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+                || 
(!FilterCompat.isFilteringRequired(options.getRecordFilter())
+                        && selection == null)) {
             return readNextRowGroup();
         }
         BlockMetaData block = blocks.get(currentBlock);
@@ -792,23 +796,67 @@ public class ParquetFileReader implements Closeable {
     }
 
     private RowRanges getRowRanges(int blockIndex) {
-        assert FilterCompat.isFilteringRequired(options.getRecordFilter())
-                : "Should not be invoked if filter is null or NOOP";
+        boolean filteringRequired = 
FilterCompat.isFilteringRequired(options.getRecordFilter());
+        if (!filteringRequired && selection == null) {
+            throw new IllegalArgumentException("Should not be invoked if 
filter is null or NOOP");
+        }
+
         RowRanges rowRanges = blockRowRanges.get(blockIndex);
         if (rowRanges == null) {
-            rowRanges =
-                    ColumnIndexFilter.calculateRowRanges(
-                            options.getRecordFilter(),
-                            getColumnIndexStore(blockIndex),
-                            paths.keySet(),
-                            blocks.get(blockIndex).getRowCount(),
-                            blocks.get(blockIndex).getRowIndexOffset(),
-                            selection);
+            BlockMetaData block = blocks.get(blockIndex);
+            rowRanges = RowRanges.createSingle(block.getRowCount());
+            if (selection != null) {
+                RowRanges result = calculateRowRanges(blockIndex, selection);
+                rowRanges = RowRanges.intersection(result, rowRanges);
+            }
+
+            if (filteringRequired) {
+                RowRanges result =
+                        ColumnIndexFilter.calculateRowRanges(
+                                options.getRecordFilter(),
+                                getColumnIndexStore(blockIndex),
+                                paths.keySet(),
+                                block.getRowCount());
+                rowRanges = RowRanges.intersection(result, rowRanges);
+            }
             blockRowRanges.set(blockIndex, rowRanges);
         }
         return rowRanges;
     }
 
+    private RowRanges calculateRowRanges(int blockIndex, RoaringBitmap32 
selection) {
+        List<OffsetIndex> offsets;
+        BlockMetaData block = blocks.get(blockIndex);
+        if (paths.isEmpty()) {
+            Optional<ColumnChunkMetaData> first = 
block.getColumns().stream().findFirst();
+            if (first.isPresent()) {
+                ColumnPath path = first.get().getPath();
+                OffsetIndex index =
+                        ColumnIndexStoreImpl.create(this, block, 
Collections.singleton(path))
+                                .getOffsetIndex(path);
+                offsets = Collections.singletonList(index);
+            } else {
+                offsets = Collections.emptyList();
+            }
+        } else {
+            ColumnIndexStore store = getColumnIndexStore(blockIndex);
+            offsets =
+                    
paths.keySet().stream().map(store::getOffsetIndex).collect(Collectors.toList());
+        }
+
+        long rowCount = block.getRowCount();
+        long rowIndexOffset = block.getRowIndexOffset();
+        RowRanges rowRanges = RowRanges.createSingle(rowCount);
+        for (OffsetIndex offset : offsets) {
+            if (offset != null) {
+                RowRanges result = RowRanges.create(rowCount, rowIndexOffset, 
offset, selection);
+                rowRanges = RowRanges.intersection(result, rowRanges);
+            }
+        }
+
+        return rowRanges;
+    }
+
     public boolean skipNextRowGroup() {
         return advanceToNextBlock();
     }
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
deleted file mode 100644
index d640d379ba..0000000000
--- 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.parquet.internal.filter2.columnindex;
-
-import org.apache.paimon.utils.RoaringBitmap32;
-
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
-import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
-import 
org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
-import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
-import org.apache.parquet.filter2.predicate.Operators;
-import org.apache.parquet.filter2.predicate.Operators.And;
-import org.apache.parquet.filter2.predicate.Operators.Column;
-import org.apache.parquet.filter2.predicate.Operators.Contains;
-import org.apache.parquet.filter2.predicate.Operators.Eq;
-import org.apache.parquet.filter2.predicate.Operators.Gt;
-import org.apache.parquet.filter2.predicate.Operators.GtEq;
-import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
-import org.apache.parquet.filter2.predicate.Operators.Lt;
-import org.apache.parquet.filter2.predicate.Operators.LtEq;
-import org.apache.parquet.filter2.predicate.Operators.Not;
-import org.apache.parquet.filter2.predicate.Operators.NotEq;
-import org.apache.parquet.filter2.predicate.Operators.Or;
-import org.apache.parquet.filter2.predicate.Operators.UserDefined;
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import 
org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.PrimitiveIterator;
-import java.util.Set;
-import java.util.function.Function;
-
-/**
- * Filter implementation based on column indexes. No filtering will be applied 
for columns where no
- * column index is available. Offset index is required for all the columns in 
the projection,
- * therefore a {@link MissingOffsetIndexException} will be thrown from any 
{@code visit} methods if
- * any of the required offset indexes is missing.
- *
- * <p>Note: The class was copied over to support using {@link RoaringBitmap32} 
to filter {@link
- * RowRanges}.
- */
-public class ColumnIndexFilter implements Visitor<RowRanges> {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnIndexFilter.class);
-    private final ColumnIndexStore columnIndexStore;
-    private final Set<ColumnPath> columns;
-    private final long rowCount;
-    private final long rowIndexOffset;
-    @Nullable private final RoaringBitmap32 selection;
-    private RowRanges allRows;
-
-    /**
-     * Calculates the row ranges containing the indexes of the rows might 
match the specified
-     * filter.
-     *
-     * @param filter to be used for filtering the rows
-     * @param columnIndexStore the store for providing column/offset indexes
-     * @param paths the paths of the columns used in the actual projection; a 
column not being part
-     *     of the projection will be handled as containing {@code null} values 
only even if the
-     *     column has values written in the file
-     * @param rowCount the total number of rows in the row-group
-     * @param rowIndexOffset the offset of the row-group
-     * @param selection the selected position; it will use to filter or narrow 
the row ranges
-     * @return the ranges of the possible matching row indexes; the returned 
ranges will contain all
-     *     the rows if any of the required offset index is missing
-     */
-    public static RowRanges calculateRowRanges(
-            FilterCompat.Filter filter,
-            ColumnIndexStore columnIndexStore,
-            Set<ColumnPath> paths,
-            long rowCount,
-            long rowIndexOffset,
-            @Nullable RoaringBitmap32 selection) {
-        return filter.accept(
-                new FilterCompat.Visitor<RowRanges>() {
-                    @Override
-                    public RowRanges visit(FilterPredicateCompat 
filterPredicateCompat) {
-                        try {
-                            return filterPredicateCompat
-                                    .getFilterPredicate()
-                                    .accept(
-                                            new ColumnIndexFilter(
-                                                    columnIndexStore,
-                                                    paths,
-                                                    rowCount,
-                                                    rowIndexOffset,
-                                                    selection));
-                        } catch (MissingOffsetIndexException e) {
-                            LOGGER.info(e.getMessage());
-                            return RowRanges.createSingle(rowCount);
-                        }
-                    }
-
-                    @Override
-                    public RowRanges visit(UnboundRecordFilterCompat 
unboundRecordFilterCompat) {
-                        return RowRanges.createSingle(rowCount);
-                    }
-
-                    @Override
-                    public RowRanges visit(NoOpFilter noOpFilter) {
-                        return RowRanges.createSingle(rowCount);
-                    }
-                });
-    }
-
-    private ColumnIndexFilter(
-            ColumnIndexStore columnIndexStore,
-            Set<ColumnPath> paths,
-            long rowCount,
-            long rowIndexOffset,
-            @Nullable RoaringBitmap32 selection) {
-        this.columnIndexStore = columnIndexStore;
-        this.columns = paths;
-        this.rowCount = rowCount;
-        this.rowIndexOffset = rowIndexOffset;
-        this.selection = selection;
-    }
-
-    private RowRanges allRows() {
-        if (allRows == null) {
-            allRows = RowRanges.createSingle(rowCount);
-        }
-        return allRows;
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
-        return applyPredicate(
-                eq.getColumn(),
-                ci -> ci.visit(eq),
-                eq.getValue() == null ? allRows() : RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
-        return applyPredicate(
-                notEq.getColumn(),
-                ci -> ci.visit(notEq),
-                notEq.getValue() == null ? RowRanges.EMPTY : allRows());
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
-        return applyPredicate(lt.getColumn(), ci -> ci.visit(lt), 
RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
-        return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq), 
RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
-        return applyPredicate(gt.getColumn(), ci -> ci.visit(gt), 
RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
-        return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq), 
RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(Operators.In<T> in) {
-        boolean isNull = in.getValues().contains(null);
-        return applyPredicate(
-                in.getColumn(), ci -> ci.visit(in), isNull ? allRows() : 
RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(Operators.NotIn<T> notIn) 
{
-        boolean isNull = notIn.getValues().contains(null);
-        return applyPredicate(
-                notIn.getColumn(), ci -> ci.visit(notIn), isNull ? 
RowRanges.EMPTY : allRows());
-    }
-
-    @Override
-    public <T extends Comparable<T>> RowRanges visit(Contains<T> contains) {
-        return contains.filter(
-                this, RowRanges::intersection, RowRanges::union, ranges -> 
allRows());
-    }
-
-    @Override
-    public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
RowRanges visit(
-            UserDefined<T, U> udp) {
-        return applyPredicate(
-                udp.getColumn(),
-                ci -> ci.visit(udp),
-                udp.getUserDefinedPredicate().acceptsNullValue() ? allRows() : 
RowRanges.EMPTY);
-    }
-
-    @Override
-    public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
RowRanges visit(
-            LogicalNotUserDefined<T, U> udp) {
-        return applyPredicate(
-                udp.getUserDefined().getColumn(),
-                ci -> ci.visit(udp),
-                
udp.getUserDefined().getUserDefinedPredicate().acceptsNullValue()
-                        ? RowRanges.EMPTY
-                        : allRows());
-    }
-
-    private RowRanges applyPredicate(
-            Column<?> column,
-            Function<ColumnIndex, PrimitiveIterator.OfInt> func,
-            RowRanges rangesForMissingColumns) {
-        ColumnPath columnPath = column.getColumnPath();
-        if (!columns.contains(columnPath)) {
-            return rangesForMissingColumns;
-        }
-
-        OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath);
-        ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath);
-        if (ci == null) {
-            LOGGER.info(
-                    "No column index for column {} is available; Unable to 
filter on this column",
-                    columnPath);
-            return allRows();
-        }
-
-        return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, 
selection);
-    }
-
-    @Override
-    public RowRanges visit(And and) {
-        RowRanges leftResult = and.getLeft().accept(this);
-        if (leftResult.getRanges().isEmpty()) {
-            return leftResult;
-        }
-
-        return RowRanges.intersection(leftResult, and.getRight().accept(this));
-    }
-
-    @Override
-    public RowRanges visit(Or or) {
-        RowRanges leftResult = or.getLeft().accept(this);
-        if (leftResult.getRanges().size() == 1 && leftResult.rowCount() == 
rowCount) {
-            return leftResult;
-        }
-
-        return RowRanges.union(leftResult, or.getRight().accept(this));
-    }
-
-    @Override
-    public RowRanges visit(Not not) {
-        throw new IllegalArgumentException(
-                "Predicates containing a NOT must be run through 
LogicalInverseRewriter. " + not);
-    }
-}
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
index 7bf4043397..2e4d4ca867 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -20,7 +20,6 @@ package org.apache.parquet.internal.filter2.columnindex;
 
 import org.apache.paimon.utils.RoaringBitmap32;
 
-import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 
 import javax.annotation.Nullable;
@@ -31,7 +30,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.PrimitiveIterator;
-import java.util.Set;
 
 /**
  * Class representing row ranges in a row-group. These row ranges are 
calculated as a result of the
@@ -39,11 +37,8 @@ import java.util.Set;
  * row-group, retrieve the count of the matching rows or check overlapping of 
a row index range.
  *
  * <p>Note: The class was copied over to support using selected position to 
filter or narrow the
- * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long,
- * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32)}
- *
- * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, 
long, long,
- *     RoaringBitmap32)
+ * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, 
OffsetIndex,
+ * RoaringBitmap32)}
  */
 public class RowRanges {
 
@@ -165,14 +160,12 @@ public class RowRanges {
     public static RowRanges create(
             long rowCount,
             long rowIndexOffset,
-            PrimitiveIterator.OfInt pageIndexes,
             OffsetIndex offsetIndex,
             @Nullable RoaringBitmap32 selection) {
         RowRanges ranges = new RowRanges();
-        while (pageIndexes.hasNext()) {
-            int pageIndex = pageIndexes.nextInt();
-            long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex);
-            long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, 
rowCount);
+        for (int i = 0; i < offsetIndex.getPageCount(); i++) {
+            long firstRowIndex = offsetIndex.getFirstRowIndex(i);
+            long lastRowIndex = offsetIndex.getLastRowIndex(i, rowCount);
 
             // using selected position to filter or narrow the row ranges
             if (selection != null) {

Reply via email to