This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c1d5ad17b4 [parquet] Support generate the RowRanges directly from the
selection (#6060)
c1d5ad17b4 is described below
commit c1d5ad17b4abc5059f835d19b9ec4bc2bf7cd08a
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Mon Aug 18 19:19:18 2025 +0800
[parquet] Support generate the RowRanges directly from the selection (#6060)
---
.../paimon/table/AppendOnlySimpleTableTest.java | 4 +-
.../apache/parquet/hadoop/ParquetFileReader.java | 74 +++++-
.../filter2/columnindex/ColumnIndexFilter.java | 271 ---------------------
.../internal/filter2/columnindex/RowRanges.java | 17 +-
4 files changed, 68 insertions(+), 298 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index babb62a995..c262f892a8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -826,7 +826,7 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
}
@Test
- public void testRangeBitmapIndexTopNFilter() throws Exception {
+ public void testTopNResultFilterParquetRowRanges() throws Exception {
RowType rowType =
RowType.builder()
.field("id", DataTypes.STRING())
@@ -851,7 +851,7 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
// in unaware-bucket mode, we split files into splits all the time
FileStoreTable table = createUnawareBucketFileStoreTable(rowType,
configure);
- int bound = 300000;
+ int bound = 30000000;
int rowCount = 1000000;
Random random = new Random();
int k = random.nextInt(100) + 1;
diff --git
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 05e9d12033..cdf723c524 100644
---
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -98,6 +98,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -329,7 +330,8 @@ public class ParquetFileReader implements Closeable {
public long getFilteredRecordCount() {
if (!options.useColumnIndexFilter()
- ||
!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+ ||
(!FilterCompat.isFilteringRequired(options.getRecordFilter())
+ && selection == null)) {
return getRecordCount();
}
long total = 0L;
@@ -514,7 +516,8 @@ public class ParquetFileReader implements Closeable {
// Filtering not required -> fall back to the non-filtering path
if (!options.useColumnIndexFilter()
- ||
!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+ ||
(!FilterCompat.isFilteringRequired(options.getRecordFilter())
+ && selection == null)) {
return internalReadRowGroup(blockIndex);
}
@@ -681,7 +684,8 @@ public class ParquetFileReader implements Closeable {
}
// Filtering not required -> fall back to the non-filtering path
if (!options.useColumnIndexFilter()
- ||
!FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+ ||
(!FilterCompat.isFilteringRequired(options.getRecordFilter())
+ && selection == null)) {
return readNextRowGroup();
}
BlockMetaData block = blocks.get(currentBlock);
@@ -792,23 +796,67 @@ public class ParquetFileReader implements Closeable {
}
private RowRanges getRowRanges(int blockIndex) {
- assert FilterCompat.isFilteringRequired(options.getRecordFilter())
- : "Should not be invoked if filter is null or NOOP";
+ boolean filteringRequired =
FilterCompat.isFilteringRequired(options.getRecordFilter());
+ if (!filteringRequired && selection == null) {
+ throw new IllegalArgumentException("Should not be invoked if
filter is null or NOOP");
+ }
+
RowRanges rowRanges = blockRowRanges.get(blockIndex);
if (rowRanges == null) {
- rowRanges =
- ColumnIndexFilter.calculateRowRanges(
- options.getRecordFilter(),
- getColumnIndexStore(blockIndex),
- paths.keySet(),
- blocks.get(blockIndex).getRowCount(),
- blocks.get(blockIndex).getRowIndexOffset(),
- selection);
+ BlockMetaData block = blocks.get(blockIndex);
+ rowRanges = RowRanges.createSingle(block.getRowCount());
+ if (selection != null) {
+ RowRanges result = calculateRowRanges(blockIndex, selection);
+ rowRanges = RowRanges.intersection(result, rowRanges);
+ }
+
+ if (filteringRequired) {
+ RowRanges result =
+ ColumnIndexFilter.calculateRowRanges(
+ options.getRecordFilter(),
+ getColumnIndexStore(blockIndex),
+ paths.keySet(),
+ block.getRowCount());
+ rowRanges = RowRanges.intersection(result, rowRanges);
+ }
blockRowRanges.set(blockIndex, rowRanges);
}
return rowRanges;
}
+ private RowRanges calculateRowRanges(int blockIndex, RoaringBitmap32
selection) {
+ List<OffsetIndex> offsets;
+ BlockMetaData block = blocks.get(blockIndex);
+ if (paths.isEmpty()) {
+ Optional<ColumnChunkMetaData> first =
block.getColumns().stream().findFirst();
+ if (first.isPresent()) {
+ ColumnPath path = first.get().getPath();
+ OffsetIndex index =
+ ColumnIndexStoreImpl.create(this, block,
Collections.singleton(path))
+ .getOffsetIndex(path);
+ offsets = Collections.singletonList(index);
+ } else {
+ offsets = Collections.emptyList();
+ }
+ } else {
+ ColumnIndexStore store = getColumnIndexStore(blockIndex);
+ offsets =
+
paths.keySet().stream().map(store::getOffsetIndex).collect(Collectors.toList());
+ }
+
+ long rowCount = block.getRowCount();
+ long rowIndexOffset = block.getRowIndexOffset();
+ RowRanges rowRanges = RowRanges.createSingle(rowCount);
+ for (OffsetIndex offset : offsets) {
+ if (offset != null) {
+ RowRanges result = RowRanges.create(rowCount, rowIndexOffset,
offset, selection);
+ rowRanges = RowRanges.intersection(result, rowRanges);
+ }
+ }
+
+ return rowRanges;
+ }
+
public boolean skipNextRowGroup() {
return advanceToNextBlock();
}
diff --git
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
deleted file mode 100644
index d640d379ba..0000000000
---
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.parquet.internal.filter2.columnindex;
-
-import org.apache.paimon.utils.RoaringBitmap32;
-
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
-import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
-import
org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
-import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
-import org.apache.parquet.filter2.predicate.Operators;
-import org.apache.parquet.filter2.predicate.Operators.And;
-import org.apache.parquet.filter2.predicate.Operators.Column;
-import org.apache.parquet.filter2.predicate.Operators.Contains;
-import org.apache.parquet.filter2.predicate.Operators.Eq;
-import org.apache.parquet.filter2.predicate.Operators.Gt;
-import org.apache.parquet.filter2.predicate.Operators.GtEq;
-import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
-import org.apache.parquet.filter2.predicate.Operators.Lt;
-import org.apache.parquet.filter2.predicate.Operators.LtEq;
-import org.apache.parquet.filter2.predicate.Operators.Not;
-import org.apache.parquet.filter2.predicate.Operators.NotEq;
-import org.apache.parquet.filter2.predicate.Operators.Or;
-import org.apache.parquet.filter2.predicate.Operators.UserDefined;
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import
org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.PrimitiveIterator;
-import java.util.Set;
-import java.util.function.Function;
-
-/**
- * Filter implementation based on column indexes. No filtering will be applied
for columns where no
- * column index is available. Offset index is required for all the columns in
the projection,
- * therefore a {@link MissingOffsetIndexException} will be thrown from any
{@code visit} methods if
- * any of the required offset indexes is missing.
- *
- * <p>Note: The class was copied over to support using {@link RoaringBitmap32}
to filter {@link
- * RowRanges}.
- */
-public class ColumnIndexFilter implements Visitor<RowRanges> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(ColumnIndexFilter.class);
- private final ColumnIndexStore columnIndexStore;
- private final Set<ColumnPath> columns;
- private final long rowCount;
- private final long rowIndexOffset;
- @Nullable private final RoaringBitmap32 selection;
- private RowRanges allRows;
-
- /**
- * Calculates the row ranges containing the indexes of the rows might
match the specified
- * filter.
- *
- * @param filter to be used for filtering the rows
- * @param columnIndexStore the store for providing column/offset indexes
- * @param paths the paths of the columns used in the actual projection; a
column not being part
- * of the projection will be handled as containing {@code null} values
only even if the
- * column has values written in the file
- * @param rowCount the total number of rows in the row-group
- * @param rowIndexOffset the offset of the row-group
- * @param selection the selected position; it will use to filter or narrow
the row ranges
- * @return the ranges of the possible matching row indexes; the returned
ranges will contain all
- * the rows if any of the required offset index is missing
- */
- public static RowRanges calculateRowRanges(
- FilterCompat.Filter filter,
- ColumnIndexStore columnIndexStore,
- Set<ColumnPath> paths,
- long rowCount,
- long rowIndexOffset,
- @Nullable RoaringBitmap32 selection) {
- return filter.accept(
- new FilterCompat.Visitor<RowRanges>() {
- @Override
- public RowRanges visit(FilterPredicateCompat
filterPredicateCompat) {
- try {
- return filterPredicateCompat
- .getFilterPredicate()
- .accept(
- new ColumnIndexFilter(
- columnIndexStore,
- paths,
- rowCount,
- rowIndexOffset,
- selection));
- } catch (MissingOffsetIndexException e) {
- LOGGER.info(e.getMessage());
- return RowRanges.createSingle(rowCount);
- }
- }
-
- @Override
- public RowRanges visit(UnboundRecordFilterCompat
unboundRecordFilterCompat) {
- return RowRanges.createSingle(rowCount);
- }
-
- @Override
- public RowRanges visit(NoOpFilter noOpFilter) {
- return RowRanges.createSingle(rowCount);
- }
- });
- }
-
- private ColumnIndexFilter(
- ColumnIndexStore columnIndexStore,
- Set<ColumnPath> paths,
- long rowCount,
- long rowIndexOffset,
- @Nullable RoaringBitmap32 selection) {
- this.columnIndexStore = columnIndexStore;
- this.columns = paths;
- this.rowCount = rowCount;
- this.rowIndexOffset = rowIndexOffset;
- this.selection = selection;
- }
-
- private RowRanges allRows() {
- if (allRows == null) {
- allRows = RowRanges.createSingle(rowCount);
- }
- return allRows;
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
- return applyPredicate(
- eq.getColumn(),
- ci -> ci.visit(eq),
- eq.getValue() == null ? allRows() : RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
- return applyPredicate(
- notEq.getColumn(),
- ci -> ci.visit(notEq),
- notEq.getValue() == null ? RowRanges.EMPTY : allRows());
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
- return applyPredicate(lt.getColumn(), ci -> ci.visit(lt),
RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
- return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq),
RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
- return applyPredicate(gt.getColumn(), ci -> ci.visit(gt),
RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
- return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq),
RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(Operators.In<T> in) {
- boolean isNull = in.getValues().contains(null);
- return applyPredicate(
- in.getColumn(), ci -> ci.visit(in), isNull ? allRows() :
RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(Operators.NotIn<T> notIn)
{
- boolean isNull = notIn.getValues().contains(null);
- return applyPredicate(
- notIn.getColumn(), ci -> ci.visit(notIn), isNull ?
RowRanges.EMPTY : allRows());
- }
-
- @Override
- public <T extends Comparable<T>> RowRanges visit(Contains<T> contains) {
- return contains.filter(
- this, RowRanges::intersection, RowRanges::union, ranges ->
allRows());
- }
-
- @Override
- public <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
RowRanges visit(
- UserDefined<T, U> udp) {
- return applyPredicate(
- udp.getColumn(),
- ci -> ci.visit(udp),
- udp.getUserDefinedPredicate().acceptsNullValue() ? allRows() :
RowRanges.EMPTY);
- }
-
- @Override
- public <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
RowRanges visit(
- LogicalNotUserDefined<T, U> udp) {
- return applyPredicate(
- udp.getUserDefined().getColumn(),
- ci -> ci.visit(udp),
-
udp.getUserDefined().getUserDefinedPredicate().acceptsNullValue()
- ? RowRanges.EMPTY
- : allRows());
- }
-
- private RowRanges applyPredicate(
- Column<?> column,
- Function<ColumnIndex, PrimitiveIterator.OfInt> func,
- RowRanges rangesForMissingColumns) {
- ColumnPath columnPath = column.getColumnPath();
- if (!columns.contains(columnPath)) {
- return rangesForMissingColumns;
- }
-
- OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath);
- ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath);
- if (ci == null) {
- LOGGER.info(
- "No column index for column {} is available; Unable to
filter on this column",
- columnPath);
- return allRows();
- }
-
- return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi,
selection);
- }
-
- @Override
- public RowRanges visit(And and) {
- RowRanges leftResult = and.getLeft().accept(this);
- if (leftResult.getRanges().isEmpty()) {
- return leftResult;
- }
-
- return RowRanges.intersection(leftResult, and.getRight().accept(this));
- }
-
- @Override
- public RowRanges visit(Or or) {
- RowRanges leftResult = or.getLeft().accept(this);
- if (leftResult.getRanges().size() == 1 && leftResult.rowCount() ==
rowCount) {
- return leftResult;
- }
-
- return RowRanges.union(leftResult, or.getRight().accept(this));
- }
-
- @Override
- public RowRanges visit(Not not) {
- throw new IllegalArgumentException(
- "Predicates containing a NOT must be run through
LogicalInverseRewriter. " + not);
- }
-}
diff --git
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
index 7bf4043397..2e4d4ca867 100644
---
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
+++
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -20,7 +20,6 @@ package org.apache.parquet.internal.filter2.columnindex;
import org.apache.paimon.utils.RoaringBitmap32;
-import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import javax.annotation.Nullable;
@@ -31,7 +30,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PrimitiveIterator;
-import java.util.Set;
/**
* Class representing row ranges in a row-group. These row ranges are
calculated as a result of the
@@ -39,11 +37,8 @@ import java.util.Set;
* row-group, retrieve the count of the matching rows or check overlapping of
a row index range.
*
* <p>Note: The class was copied over to support using selected position to
filter or narrow the
- * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long,
- * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32)}
- *
- * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set,
long, long,
- * RoaringBitmap32)
+ * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long,
OffsetIndex,
+ * RoaringBitmap32)}
*/
public class RowRanges {
@@ -165,14 +160,12 @@ public class RowRanges {
public static RowRanges create(
long rowCount,
long rowIndexOffset,
- PrimitiveIterator.OfInt pageIndexes,
OffsetIndex offsetIndex,
@Nullable RoaringBitmap32 selection) {
RowRanges ranges = new RowRanges();
- while (pageIndexes.hasNext()) {
- int pageIndex = pageIndexes.nextInt();
- long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex);
- long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex,
rowCount);
+ for (int i = 0; i < offsetIndex.getPageCount(); i++) {
+ long firstRowIndex = offsetIndex.getFirstRowIndex(i);
+ long lastRowIndex = offsetIndex.getLastRowIndex(i, rowCount);
// using selected position to filter or narrow the row ranges
if (selection != null) {