This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 0a6d95f83b6baf88dd9140a0e75edd5e5db8d30a
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Tue Dec 31 16:07:46 2024 +0800

    [parquet] Fix file index result filter the row ranges missing rowgroup 
offset problem (#4806)
---
 .../paimon/table/AppendOnlyFileStoreTableTest.java | 29 +++++++++++-----------
 .../apache/parquet/hadoop/ParquetFileReader.java   |  1 +
 .../filter2/columnindex/ColumnIndexFilter.java     |  7 +++++-
 .../internal/filter2/columnindex/RowRanges.java    | 13 ++++++----
 4 files changed, 30 insertions(+), 20 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index dd85bf8bcf..f2bd0c5ea9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -73,6 +73,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -731,7 +732,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
     public void testBitmapIndexResultFilterParquetRowRanges() throws Exception 
{
         RowType rowType =
                 RowType.builder()
-                        .field("id", DataTypes.INT())
+                        .field("id", DataTypes.STRING())
                         .field("event", DataTypes.STRING())
                         .field("price", DataTypes.INT())
                         .build();
@@ -749,26 +750,26 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                                             + "."
                                             + CoreOptions.COLUMNS,
                                     "price");
+                            options.set(ParquetOutputFormat.BLOCK_SIZE, 
"1048576");
                             options.set(
                                     
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
                             
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
                         });
 
-        int bound = 3000;
+        int bound = 300000;
         Random random = new Random();
         Map<Integer, Integer> expectedMap = new HashMap<>();
-        for (int i = 0; i < 5; i++) {
-            StreamTableWrite write = table.newWrite(commitUser);
-            StreamTableCommit commit = table.newCommit(commitUser);
-            for (int j = 0; j < 10000; j++) {
-                int next = random.nextInt(bound);
-                expectedMap.compute(next, (key, value) -> value == null ? 1 : 
value + 1);
-                write.write(GenericRow.of(1, BinaryString.fromString("A"), 
next));
-            }
-            commit.commit(i, write.prepareCommit(true, i));
-            write.close();
-            commit.close();
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        for (int j = 0; j < 1000000; j++) {
+            int next = random.nextInt(bound);
+            BinaryString uuid = 
BinaryString.fromString(UUID.randomUUID().toString());
+            expectedMap.compute(next, (key, value) -> value == null ? 1 : 
value + 1);
+            write.write(GenericRow.of(uuid, uuid, next));
         }
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
 
         // test eq
         for (int i = 0; i < 10; i++) {
@@ -789,7 +790,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         //  test between
         for (int i = 0; i < 10; i++) {
-            int max = random.nextInt(bound);
+            int max = random.nextInt(bound) + 1;
             int min = random.nextInt(max);
             Predicate predicate =
                     PredicateBuilder.and(
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e9f757126a..c0a28cdc9b 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -761,6 +761,7 @@ public class ParquetFileReader implements Closeable {
                             getColumnIndexStore(blockIndex),
                             paths.keySet(),
                             blocks.get(blockIndex).getRowCount(),
+                            blocks.get(blockIndex).getRowIndexOffset(),
                             fileIndexResult);
             blockRowRanges.set(blockIndex, rowRanges);
         }
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
index b2c9365bd6..db21a8961a 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -67,6 +67,7 @@ public class ColumnIndexFilter implements Visitor<RowRanges> {
     private final ColumnIndexStore columnIndexStore;
     private final Set<ColumnPath> columns;
     private final long rowCount;
+    private final long rowIndexOffset;
     @Nullable private final FileIndexResult fileIndexResult;
     private RowRanges allRows;
 
@@ -89,6 +90,7 @@ public class ColumnIndexFilter implements Visitor<RowRanges> {
             ColumnIndexStore columnIndexStore,
             Set<ColumnPath> paths,
             long rowCount,
+            long rowIndexOffset,
             @Nullable FileIndexResult fileIndexResult) {
         return filter.accept(
                 new FilterCompat.Visitor<RowRanges>() {
@@ -102,6 +104,7 @@ public class ColumnIndexFilter implements 
Visitor<RowRanges> {
                                                     columnIndexStore,
                                                     paths,
                                                     rowCount,
+                                                    rowIndexOffset,
                                                     fileIndexResult));
                         } catch (MissingOffsetIndexException e) {
                             LOGGER.info(e.getMessage());
@@ -125,10 +128,12 @@ public class ColumnIndexFilter implements 
Visitor<RowRanges> {
             ColumnIndexStore columnIndexStore,
             Set<ColumnPath> paths,
             long rowCount,
+            long rowIndexOffset,
             @Nullable FileIndexResult fileIndexResult) {
         this.columnIndexStore = columnIndexStore;
         this.columns = paths;
         this.rowCount = rowCount;
+        this.rowIndexOffset = rowIndexOffset;
         this.fileIndexResult = fileIndexResult;
     }
 
@@ -227,7 +232,7 @@ public class ColumnIndexFilter implements 
Visitor<RowRanges> {
             return allRows();
         }
 
-        return RowRanges.create(rowCount, func.apply(ci), oi, fileIndexResult);
+        return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, 
fileIndexResult);
     }
 
     @Override
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
index 6963814831..b199f93958 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -41,10 +41,11 @@ import java.util.Set;
  * row-group, retrieve the count of the matching rows or check overlapping of 
a row index range.
  *
  * <p>Note: The class was copied over to support using {@link FileIndexResult} 
to filter {@link
- * RowRanges}. Added a new method {@link RowRanges#create(long, 
PrimitiveIterator.OfInt,
+ * RowRanges}. Added a new method {@link RowRanges#create(long, long, 
PrimitiveIterator.OfInt,
  * OffsetIndex, FileIndexResult)}
  *
- * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, 
long, FileIndexResult)
+ * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, 
long, long,
+ *     FileIndexResult)
  */
 public class RowRanges {
 
@@ -165,6 +166,7 @@ public class RowRanges {
     /** Support using {@link FileIndexResult} to filter the row ranges. */
     public static RowRanges create(
             long rowCount,
+            long rowIndexOffset,
             PrimitiveIterator.OfInt pageIndexes,
             OffsetIndex offsetIndex,
             @Nullable FileIndexResult fileIndexResult) {
@@ -178,13 +180,14 @@ public class RowRanges {
             if (fileIndexResult instanceof BitmapIndexResult) {
                 RoaringBitmap32 bitmap = ((BitmapIndexResult) 
fileIndexResult).get();
                 RoaringBitmap32 range =
-                        RoaringBitmap32.bitmapOfRange(firstRowIndex, 
lastRowIndex + 1);
+                        RoaringBitmap32.bitmapOfRange(
+                                rowIndexOffset + firstRowIndex, rowIndexOffset 
+ lastRowIndex + 1);
                 RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range);
                 if (result.isEmpty()) {
                     continue;
                 }
-                firstRowIndex = result.first();
-                lastRowIndex = result.last();
+                firstRowIndex = result.first() - rowIndexOffset;
+                lastRowIndex = result.last() - rowIndexOffset;
             }
 
             ranges.add(new Range(firstRowIndex, lastRowIndex));

Reply via email to