This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 43dff3a379 [parquet] Support using file index result to filter row
ranges (#4780)
43dff3a379 is described below
commit 43dff3a3799ae86de425021cfd99e269916c7421
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Mon Dec 30 10:21:57 2024 +0800
[parquet] Support using file index result to filter row ranges (#4780)
---
.../org/apache/paimon/utils/RoaringBitmap32.java | 8 +
.../paimon/table/AppendOnlyFileStoreTableTest.java | 93 ++++++
.../apache/parquet/hadoop/ParquetFileReader.java | 3 +-
.../filter2/columnindex/ColumnIndexFilter.java | 258 ++++++++++++++
.../internal/filter2/columnindex/RowRanges.java | 371 +++++++++++++++++++++
5 files changed, 732 insertions(+), 1 deletion(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 6496b7003e..d3b07a2362 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -72,6 +72,10 @@ public class RoaringBitmap32 {
return roaringBitmap.rangeCardinality(start, end);
}
+ public int first() {
+ return roaringBitmap.first();
+ }
+
public int last() {
return roaringBitmap.last();
}
@@ -138,6 +142,10 @@ public class RoaringBitmap32 {
return roaringBitmap32;
}
+ public static RoaringBitmap32 bitmapOfRange(long min, long max) {
+ return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max));
+ }
+
public static RoaringBitmap32 and(final RoaringBitmap32 x1, final
RoaringBitmap32 x2) {
return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap,
x2.roaringBitmap));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 471b60d3cf..dd85bf8bcf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -58,6 +58,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -70,6 +71,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -79,8 +81,11 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
import static
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
@@ -722,6 +727,94 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
});
}
+ @Test
+ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception
{
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("event", DataTypes.STRING())
+ .field("price", DataTypes.INT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+ options.set(WRITE_ONLY, true);
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ +
BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+ + "."
+ + CoreOptions.COLUMNS,
+ "price");
+ options.set(
+
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
+ });
+
+ int bound = 3000;
+ Random random = new Random();
+ Map<Integer, Integer> expectedMap = new HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ for (int j = 0; j < 10000; j++) {
+ int next = random.nextInt(bound);
+ expectedMap.compute(next, (key, value) -> value == null ? 1 :
value + 1);
+ write.write(GenericRow.of(1, BinaryString.fromString("A"),
next));
+ }
+ commit.commit(i, write.prepareCommit(true, i));
+ write.close();
+ commit.close();
+ }
+
+ // test eq
+ for (int i = 0; i < 10; i++) {
+ int key = random.nextInt(bound);
+ Predicate predicate = new PredicateBuilder(rowType).equal(2, key);
+ TableScan.Plan plan = table.newScan().plan();
+ RecordReader<InternalRow> reader =
+
table.newRead().withFilter(predicate).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ cnt.incrementAndGet();
+ assertThat(row.getInt(2)).isEqualTo(key);
+ });
+ assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0));
+ reader.close();
+ }
+
+ // test between
+ for (int i = 0; i < 10; i++) {
+ int max = random.nextInt(bound);
+ int min = random.nextInt(max);
+ Predicate predicate =
+ PredicateBuilder.and(
+ new PredicateBuilder(rowType).greaterOrEqual(2,
min),
+ new PredicateBuilder(rowType).lessOrEqual(2, max));
+ TableScan.Plan plan = table.newScan().plan();
+ RecordReader<InternalRow> reader =
+
table.newRead().withFilter(predicate).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(
+ row -> {
+ cnt.addAndGet(1);
+ assertThat(row.getInt(2)).isGreaterThanOrEqualTo(min);
+ assertThat(row.getInt(2)).isLessThanOrEqualTo(max);
+ });
+ Optional<Integer> reduce =
+ expectedMap.entrySet().stream()
+ .filter(x -> x.getKey() >= min && x.getKey() <=
max)
+ .map(Map.Entry::getValue)
+ .reduce(Integer::sum);
+ assertThat(cnt.get()).isEqualTo(reduce.orElse(0));
+ reader.close();
+ }
+ }
+
@Test
public void testWithShardAppendTable() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET,
-1));
diff --git
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e3fc118ad6..e9f757126a 100644
---
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -760,7 +760,8 @@ public class ParquetFileReader implements Closeable {
options.getRecordFilter(),
getColumnIndexStore(blockIndex),
paths.keySet(),
- blocks.get(blockIndex).getRowCount());
+ blocks.get(blockIndex).getRowCount(),
+ fileIndexResult);
blockRowRanges.set(blockIndex, rowRanges);
}
return rowRanges;
diff --git
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
new file mode 100644
index 0000000000..b2c9365bd6
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.internal.filter2.columnindex;
+
+import org.apache.paimon.fileindex.FileIndexResult;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import
org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import
org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.PrimitiveIterator;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Filter implementation based on column indexes. No filtering will be applied
for columns where no
+ * column index is available. Offset index is required for all the columns in
the projection,
+ * therefore a {@link MissingOffsetIndexException} will be thrown from any
{@code visit} methods if
+ * any of the required offset indexes is missing.
+ *
+ * <p>Note: The class was copied over to support using {@link FileIndexResult}
to filter {@link
+ * RowRanges}.
+ */
+public class ColumnIndexFilter implements Visitor<RowRanges> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ColumnIndexFilter.class);
+ private final ColumnIndexStore columnIndexStore;
+ private final Set<ColumnPath> columns;
+ private final long rowCount;
+ @Nullable private final FileIndexResult fileIndexResult;
+ private RowRanges allRows;
+
+ /**
+ * Calculates the row ranges containing the indexes of the rows might
match the specified
+ * filter.
+ *
+ * @param filter to be used for filtering the rows
+ * @param columnIndexStore the store for providing column/offset indexes
+ * @param paths the paths of the columns used in the actual projection; a
column not being part
+ * of the projection will be handled as containing {@code null} values
only even if the
+ * column has values written in the file
+ * @param fileIndexResult the file index result; it will use to filter row
ranges
+ * @param rowCount the total number of rows in the row-group
+ * @return the ranges of the possible matching row indexes; the returned
ranges will contain all
+ * the rows if any of the required offset index is missing
+ */
+ public static RowRanges calculateRowRanges(
+ FilterCompat.Filter filter,
+ ColumnIndexStore columnIndexStore,
+ Set<ColumnPath> paths,
+ long rowCount,
+ @Nullable FileIndexResult fileIndexResult) {
+ return filter.accept(
+ new FilterCompat.Visitor<RowRanges>() {
+ @Override
+ public RowRanges visit(FilterPredicateCompat
filterPredicateCompat) {
+ try {
+ return filterPredicateCompat
+ .getFilterPredicate()
+ .accept(
+ new ColumnIndexFilter(
+ columnIndexStore,
+ paths,
+ rowCount,
+ fileIndexResult));
+ } catch (MissingOffsetIndexException e) {
+ LOGGER.info(e.getMessage());
+ return RowRanges.createSingle(rowCount);
+ }
+ }
+
+ @Override
+ public RowRanges visit(UnboundRecordFilterCompat
unboundRecordFilterCompat) {
+ return RowRanges.createSingle(rowCount);
+ }
+
+ @Override
+ public RowRanges visit(NoOpFilter noOpFilter) {
+ return RowRanges.createSingle(rowCount);
+ }
+ });
+ }
+
+ private ColumnIndexFilter(
+ ColumnIndexStore columnIndexStore,
+ Set<ColumnPath> paths,
+ long rowCount,
+ @Nullable FileIndexResult fileIndexResult) {
+ this.columnIndexStore = columnIndexStore;
+ this.columns = paths;
+ this.rowCount = rowCount;
+ this.fileIndexResult = fileIndexResult;
+ }
+
+ private RowRanges allRows() {
+ if (allRows == null) {
+ allRows = RowRanges.createSingle(rowCount);
+ }
+ return allRows;
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
+ return applyPredicate(
+ eq.getColumn(),
+ ci -> ci.visit(eq),
+ eq.getValue() == null ? allRows() : RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
+ return applyPredicate(
+ notEq.getColumn(),
+ ci -> ci.visit(notEq),
+ notEq.getValue() == null ? RowRanges.EMPTY : allRows());
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
+ return applyPredicate(lt.getColumn(), ci -> ci.visit(lt),
RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
+ return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq),
RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
+ return applyPredicate(gt.getColumn(), ci -> ci.visit(gt),
RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
+ return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq),
RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Operators.In<T> in) {
+ boolean isNull = in.getValues().contains(null);
+ return applyPredicate(
+ in.getColumn(), ci -> ci.visit(in), isNull ? allRows() :
RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Operators.NotIn<T> notIn)
{
+ boolean isNull = notIn.getValues().contains(null);
+ return applyPredicate(
+ notIn.getColumn(), ci -> ci.visit(notIn), isNull ?
RowRanges.EMPTY : allRows());
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
RowRanges visit(
+ UserDefined<T, U> udp) {
+ return applyPredicate(
+ udp.getColumn(),
+ ci -> ci.visit(udp),
+ udp.getUserDefinedPredicate().acceptsNullValue() ? allRows() :
RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
RowRanges visit(
+ LogicalNotUserDefined<T, U> udp) {
+ return applyPredicate(
+ udp.getUserDefined().getColumn(),
+ ci -> ci.visit(udp),
+
udp.getUserDefined().getUserDefinedPredicate().acceptsNullValue()
+ ? RowRanges.EMPTY
+ : allRows());
+ }
+
+ private RowRanges applyPredicate(
+ Column<?> column,
+ Function<ColumnIndex, PrimitiveIterator.OfInt> func,
+ RowRanges rangesForMissingColumns) {
+ ColumnPath columnPath = column.getColumnPath();
+ if (!columns.contains(columnPath)) {
+ return rangesForMissingColumns;
+ }
+
+ OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath);
+ ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath);
+ if (ci == null) {
+ LOGGER.info(
+ "No column index for column {} is available; Unable to
filter on this column",
+ columnPath);
+ return allRows();
+ }
+
+ return RowRanges.create(rowCount, func.apply(ci), oi, fileIndexResult);
+ }
+
+ @Override
+ public RowRanges visit(And and) {
+ RowRanges leftResult = and.getLeft().accept(this);
+ if (leftResult.getRanges().size() == 0) {
+ return leftResult;
+ }
+
+ return RowRanges.intersection(leftResult, and.getRight().accept(this));
+ }
+
+ @Override
+ public RowRanges visit(Or or) {
+ RowRanges leftResult = or.getLeft().accept(this);
+ if (leftResult.getRanges().size() == 1 && leftResult.rowCount() ==
rowCount) {
+ return leftResult;
+ }
+
+ return RowRanges.union(leftResult, or.getRight().accept(this));
+ }
+
+ @Override
+ public RowRanges visit(Not not) {
+ throw new IllegalArgumentException(
+ "Predicates containing a NOT must be run through
LogicalInverseRewriter. " + not);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
new file mode 100644
index 0000000000..6963814831
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.internal.filter2.columnindex;
+
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+import java.util.Set;
+
+/**
+ * Class representing row ranges in a row-group. These row ranges are
calculated as a result of the
+ * column index based filtering. To be used iterate over the matching row
indexes to be read from a
+ * row-group, retrieve the count of the matching rows or check overlapping of
a row index range.
+ *
+ * <p>Note: The class was copied over to support using {@link FileIndexResult}
to filter {@link
+ * RowRanges}. Added a new method {@link RowRanges#create(long,
PrimitiveIterator.OfInt,
+ * OffsetIndex, FileIndexResult)}
+ *
+ * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set,
long, FileIndexResult)
+ */
+public class RowRanges {
+
+ /** Make it public because some uppler layer application need to access
it. */
+ public static class Range {
+
+ // Returns the union of the two ranges or null if there are elements
between them.
+ private static Range union(Range left, Range right) {
+ if (left.from <= right.from) {
+ if (left.to + 1 >= right.from) {
+ return new Range(left.from, Math.max(left.to, right.to));
+ }
+ } else if (right.to + 1 >= left.from) {
+ return new Range(right.from, Math.max(left.to, right.to));
+ }
+ return null;
+ }
+
+ // Returns the intersection of the two ranges of null if they are not
overlapped.
+ private static Range intersection(Range left, Range right) {
+ if (left.from <= right.from) {
+ if (left.to >= right.from) {
+ return new Range(right.from, Math.min(left.to, right.to));
+ }
+ } else if (right.to >= left.from) {
+ return new Range(left.from, Math.min(left.to, right.to));
+ }
+ return null;
+ }
+
+ public final long from;
+ public final long to;
+
+ // Creates a range of [from, to] (from and to are inclusive; empty
ranges are not valid)
+ Range(long from, long to) {
+ assert from <= to;
+ this.from = from;
+ this.to = to;
+ }
+
+ long count() {
+ return to - from + 1;
+ }
+
+ boolean isBefore(Range other) {
+ return to < other.from;
+ }
+
+ boolean isAfter(Range other) {
+ return from > other.to;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + from + ", " + to + ']';
+ }
+ }
+
+ public static final RowRanges EMPTY = new
RowRanges(Collections.emptyList());
+
+ private final List<Range> ranges;
+
+ private RowRanges() {
+ this(new ArrayList<>());
+ }
+
+ private RowRanges(Range range) {
+ this(Collections.singletonList(range));
+ }
+
+ private RowRanges(List<Range> ranges) {
+ this.ranges = ranges;
+ }
+
+ /**
+ * Creates an immutable RowRanges object with the single range [0,
rowCount - 1].
+ *
+ * @param rowCount a single row count
+ * @return an immutable RowRanges
+ */
+ public static RowRanges createSingle(long rowCount) {
+ return new RowRanges(new Range(0L, rowCount - 1L));
+ }
+
+ /**
+ * Creates a mutable RowRanges object with the following ranges:
+ *
+ * <pre>
+ * [firstRowIndex[0], lastRowIndex[0]],
+ * [firstRowIndex[1], lastRowIndex[1]],
+ * ...,
+ * [firstRowIndex[n], lastRowIndex[n]]
+ * </pre>
+ *
+ * <p>(See OffsetIndex.getFirstRowIndex and OffsetIndex.getLastRowIndex
for details.)
+ *
+ * <p>The union of the ranges are calculated so the result ranges always
contain the disjunct
+ * ranges. See union for details.
+ *
+ * @param rowCount row count
+ * @param pageIndexes pageIndexes
+ * @param offsetIndex offsetIndex
+ * @return a mutable RowRanges
+ */
+ public static RowRanges create(
+ long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex
offsetIndex) {
+ RowRanges ranges = new RowRanges();
+ while (pageIndexes.hasNext()) {
+ int pageIndex = pageIndexes.nextInt();
+ ranges.add(
+ new Range(
+ offsetIndex.getFirstRowIndex(pageIndex),
+ offsetIndex.getLastRowIndex(pageIndex, rowCount)));
+ }
+ return ranges;
+ }
+
+ /** Support using {@link FileIndexResult} to filter the row ranges. */
+ public static RowRanges create(
+ long rowCount,
+ PrimitiveIterator.OfInt pageIndexes,
+ OffsetIndex offsetIndex,
+ @Nullable FileIndexResult fileIndexResult) {
+ RowRanges ranges = new RowRanges();
+ while (pageIndexes.hasNext()) {
+ int pageIndex = pageIndexes.nextInt();
+ long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex);
+ long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex,
rowCount);
+
+ // using file index result to filter or narrow the row ranges
+ if (fileIndexResult instanceof BitmapIndexResult) {
+ RoaringBitmap32 bitmap = ((BitmapIndexResult)
fileIndexResult).get();
+ RoaringBitmap32 range =
+ RoaringBitmap32.bitmapOfRange(firstRowIndex,
lastRowIndex + 1);
+ RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range);
+ if (result.isEmpty()) {
+ continue;
+ }
+ firstRowIndex = result.first();
+ lastRowIndex = result.last();
+ }
+
+ ranges.add(new Range(firstRowIndex, lastRowIndex));
+ }
+ return ranges;
+ }
+
+ /**
+ * Calculates the union of the two specified RowRanges object. The union
of two range is
+ * calculated if there are no elements between them. Otherwise, the two
disjunct ranges are
+ * stored separately.
+ *
+ * <pre>
+ * For example:
+ * [113, 241] ∪ [221, 340] = [113, 340]
+ * [113, 230] ∪ [231, 340] = [113, 340]
+ * while
+ * [113, 230] ∪ [232, 340] = [113, 230], [232, 340]
+ * </pre>
+ *
+ * <p>The result RowRanges object will contain all the row indexes that
were contained in one of
+ * the specified objects.
+ *
+ * @param left left RowRanges
+ * @param right right RowRanges
+ * @return a mutable RowRanges contains all the row indexes that were
contained in one of the
+ * specified objects
+ */
+ public static RowRanges union(RowRanges left, RowRanges right) {
+ RowRanges result = new RowRanges();
+ Iterator<Range> it1 = left.ranges.iterator();
+ Iterator<Range> it2 = right.ranges.iterator();
+ if (it2.hasNext()) {
+ Range range2 = it2.next();
+ while (it1.hasNext()) {
+ Range range1 = it1.next();
+ if (range1.isAfter(range2)) {
+ result.add(range2);
+ range2 = range1;
+ Iterator<Range> tmp = it1;
+ it1 = it2;
+ it2 = tmp;
+ } else {
+ result.add(range1);
+ }
+ }
+ result.add(range2);
+ } else {
+ it2 = it1;
+ }
+ while (it2.hasNext()) {
+ result.add(it2.next());
+ }
+
+ return result;
+ }
+
+ /**
+ * Calculates the intersection of the two specified RowRanges object. Two
ranges intersect if
+ * they have common elements otherwise the result is empty.
+ *
+ * <pre>
+ * For example:
+ * [113, 241] ∩ [221, 340] = [221, 241]
+ * while
+ * [113, 230] ∩ [231, 340] = <EMPTY>
+ * </pre>
+ *
+ * @param left left RowRanges
+ * @param right right RowRanges
+ * @return a mutable RowRanges contains all the row indexes that were
contained in both of the
+ * specified objects
+ */
+ public static RowRanges intersection(RowRanges left, RowRanges right) {
+ RowRanges result = new RowRanges();
+
+ int rightIndex = 0;
+ for (Range l : left.ranges) {
+ for (int i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ Range r = right.ranges.get(i);
+ if (l.isBefore(r)) {
+ break;
+ } else if (l.isAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.add(Range.intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ /*
+ * Adds a range to the end of the list of ranges. It maintains the
disjunct ascending order(*) of the ranges by
+ * trying to union the specified range to the last ranges in the list. The
specified range shall be larger(*) than
+ * the last one or might be overlapped with some of the last ones.
+ * (*) [a, b] < [c, d] if b < c
+ */
+ private void add(Range range) {
+ Range rangeToAdd = range;
+ for (int i = ranges.size() - 1; i >= 0; --i) {
+ Range last = ranges.get(i);
+ assert !last.isAfter(range);
+ Range u = Range.union(last, rangeToAdd);
+ if (u == null) {
+ break;
+ }
+ rangeToAdd = u;
+ ranges.remove(i);
+ }
+ ranges.add(rangeToAdd);
+ }
+
+ /** @return the number of rows in the ranges */
+ public long rowCount() {
+ long cnt = 0;
+ for (Range range : ranges) {
+ cnt += range.count();
+ }
+ return cnt;
+ }
+
+ /** @return the ascending iterator of the row indexes contained in the
ranges */
+ public PrimitiveIterator.OfLong iterator() {
+ return new PrimitiveIterator.OfLong() {
+ private int currentRangeIndex = -1;
+ private Range currentRange;
+ private long next = findNext();
+
+ private long findNext() {
+ if (currentRange == null || next + 1 > currentRange.to) {
+ if (currentRangeIndex + 1 < ranges.size()) {
+ currentRange = ranges.get(++currentRangeIndex);
+ next = currentRange.from;
+ } else {
+ return -1;
+ }
+ } else {
+ ++next;
+ }
+ return next;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next >= 0;
+ }
+
+ @Override
+ public long nextLong() {
+ long ret = next;
+ if (ret < 0) {
+ throw new NoSuchElementException();
+ }
+ next = findNext();
+ return ret;
+ }
+ };
+ }
+
+ /**
+ * @param from the first row of the range to be checked for connection
+ * @param to the last row of the range to be checked for connection
+ * @return {@code true} if the specified range is overlapping (have common
elements) with one of
+ * the ranges
+ */
+ public boolean isOverlapping(long from, long to) {
+ return Collections.binarySearch(
+ ranges,
+ new Range(from, to),
+ (r1, r2) -> r1.isBefore(r2) ? -1 : r1.isAfter(r2) ? 1
: 0)
+ >= 0;
+ }
+
+ public List<Range> getRanges() {
+ return ranges;
+ }
+
+ @Override
+ public String toString() {
+ return ranges.toString();
+ }
+}