This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a11817cea [core] Support using file index result for RowGroup
filtering. (#4473)
a11817cea is described below
commit a11817ceab72f28b919d9152350b6ec0994fbaf4
Author: Zhonghang Liu <[email protected]>
AuthorDate: Mon Nov 11 15:14:22 2024 +0800
[core] Support using file index result for RowGroup filtering. (#4473)
---
.../paimon/fileindex/FileIndexPredicate.java | 16 +-
.../apache/paimon/format/FormatReaderContext.java | 15 +
.../apache/paimon/format/FormatReaderFactory.java | 3 +
.../org/apache/paimon/utils/RoaringBitmap32.java | 4 +
...leIndexSkipper.java => FileIndexEvaluator.java} | 16 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 2 +-
.../paimon/operation/KeyValueFileStoreScan.java | 2 +-
.../apache/paimon/operation/RawFileSplitRead.java | 21 +-
.../CompactedChangelogFormatReaderFactory.java | 6 +
.../java/org/apache/orc/impl/RecordReaderImpl.java | 1889 ++++++++++++++++++++
.../apache/paimon/format/orc/OrcReaderFactory.java | 21 +-
.../format/orc/filter/OrcSimpleStatsExtractor.java | 3 +-
.../format/parquet/ParquetReaderFactory.java | 3 +-
.../apache/paimon/format/parquet/ParquetUtil.java | 10 +-
.../apache/parquet/hadoop/ParquetFileReader.java | 50 +-
.../format/parquet/ParquetFormatReadWriteTest.java | 2 +-
.../apache/paimon/spark/SparkFileIndexITCase.java | 7 +-
17 files changed, 2016 insertions(+), 54 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 2eb3f2195..8f5485dbe 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -67,22 +67,20 @@ public class FileIndexPredicate implements Closeable {
this.reader = FileIndexFormat.createReader(inputStream, fileRowType);
}
- public boolean testPredicate(@Nullable Predicate filePredicate) {
- if (filePredicate == null) {
- return true;
+ public FileIndexResult evaluate(@Nullable Predicate predicate) {
+ if (predicate == null) {
+ return REMAIN;
}
-
- Set<String> requiredFieldNames = getRequiredNames(filePredicate);
-
+ Set<String> requiredFieldNames = getRequiredNames(predicate);
Map<String, Collection<FileIndexReader>> indexReaders = new
HashMap<>();
requiredFieldNames.forEach(name -> indexReaders.put(name,
reader.readColumnIndex(name)));
- if (!new
FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
+ FileIndexResult result = new
FileIndexPredicateTest(indexReaders).test(predicate);
+ if (!result.remain()) {
LOG.debug(
"One file has been filtered: "
+ (path == null ? "in scan stage" :
path.toString()));
- return false;
}
- return true;
+ return result;
}
private Set<String> getRequiredNames(Predicate filePredicate) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
index 92a569e03..0d3dd7c79 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
@@ -18,21 +18,31 @@
package org.apache.paimon.format;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
+import javax.annotation.Nullable;
+
/** the context for creating RecordReader {@link RecordReader}. */
public class FormatReaderContext implements FormatReaderFactory.Context {
private final FileIO fileIO;
private final Path file;
private final long fileSize;
+ @Nullable private final FileIndexResult fileIndexResult;
public FormatReaderContext(FileIO fileIO, Path file, long fileSize) {
+ this(fileIO, file, fileSize, null);
+ }
+
+ public FormatReaderContext(
+ FileIO fileIO, Path file, long fileSize, @Nullable FileIndexResult
fileIndexResult) {
this.fileIO = fileIO;
this.file = file;
this.fileSize = fileSize;
+ this.fileIndexResult = fileIndexResult;
}
@Override
@@ -49,4 +59,9 @@ public class FormatReaderContext implements
FormatReaderFactory.Context {
public long fileSize() {
return fileSize;
}
+
+ @Override
+ public FileIndexResult fileIndex() {
+ return fileIndexResult;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index d2fc91501..420d44e0f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
@@ -38,5 +39,7 @@ public interface FormatReaderFactory {
Path filePath();
long fileSize();
+
+ FileIndexResult fileIndex();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index f9232dc82..1d3468a9f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -68,6 +68,10 @@ public class RoaringBitmap32 {
return roaringBitmap.getLongCardinality();
}
+ public long rangeCardinality(long start, long end) {
+ return roaringBitmap.rangeCardinality(start, end);
+ }
+
public RoaringBitmap32 clone() {
return new RoaringBitmap32(roaringBitmap.clone());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
similarity index 89%
rename from paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
rename to paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 0c4ac82a0..c34d1b0d3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.io;
import org.apache.paimon.fileindex.FileIndexPredicate;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -28,10 +29,10 @@ import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
-/** File index reader, do the filter in the constructor. */
-public class FileIndexSkipper {
+/** Evaluate file index result. */
+public class FileIndexEvaluator {
- public static boolean skip(
+ public static FileIndexResult evaluate(
FileIO fileIO,
TableSchema dataSchema,
List<Predicate> dataFilter,
@@ -55,14 +56,11 @@ public class FileIndexSkipper {
dataFilePathFactory.toPath(indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
- if (!predicate.testPredicate(
- PredicateBuilder.and(dataFilter.toArray(new
Predicate[0])))) {
- return true;
- }
+ return predicate.evaluate(
+ PredicateBuilder.and(dataFilter.toArray(new
Predicate[0])));
}
}
}
-
- return false;
+ return FileIndexResult.REMAIN;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 50a8c74dc..60b4e7933 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -120,7 +120,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
- return predicate.testPredicate(dataPredicate);
+ return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
throw new RuntimeException("Exception happens while checking
predicate.", e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 58b03694d..c368d9e51 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -144,7 +144,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
id ->
fieldValueStatsConverters.convertFilter(
entry.file().schemaId(),
valueFilter));
- return predicate.testPredicate(dataPredicate);
+ return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
throw new RuntimeException("Exception happens while checking
fileIndex predicate.", e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index fcd2f8798..9c612a9f8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -23,13 +23,14 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.FileIndexSkipper;
+import org.apache.paimon.io.FileIndexEvaluator;
import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
@@ -191,26 +192,30 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
BulkFormatMapping bulkFormatMapping,
IOExceptionSupplier<DeletionVector> dvFactory)
throws IOException {
+ FileIndexResult fileIndexResult = null;
if (fileIndexReadEnabled) {
- boolean skip =
- FileIndexSkipper.skip(
+ fileIndexResult =
+ FileIndexEvaluator.evaluate(
fileIO,
bulkFormatMapping.getDataSchema(),
bulkFormatMapping.getDataFilters(),
dataFilePathFactory,
file);
- if (skip) {
+ if (!fileIndexResult.remain()) {
return new EmptyRecordReader<>();
}
}
+ FormatReaderContext formatReaderContext =
+ new FormatReaderContext(
+ fileIO,
+ dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize(),
+ fileIndexResult);
FileRecordReader fileRecordReader =
new FileRecordReader(
bulkFormatMapping.getReaderFactory(),
- new FormatReaderContext(
- fileIO,
- dataFilePathFactory.toPath(file.fileName()),
- file.fileSize()),
+ formatReaderContext,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index 8d1731151..e17566f30 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.compact.changelog.format;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
@@ -80,6 +81,11 @@ public class CompactedChangelogFormatReaderFactory
implements FormatReaderFactor
public long fileSize() {
return length;
}
+
+ @Override
+ public FileIndexResult fileIndex() {
+ return context.fileIndex();
+ }
});
}
diff --git
a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java
b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java
new file mode 100644
index 000000000..dffa8ad77
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -0,0 +1,1889 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.util.TimestampUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.CollectionColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.OrcProto;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.reader.ReaderEncryption;
+import org.apache.orc.impl.reader.StripePlanner;
+import org.apache.orc.impl.reader.tree.BatchReader;
+import org.apache.orc.impl.reader.tree.TypeReader;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.chrono.ChronoLocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+/* This file is based on source code from the ORC Project
(http://orc.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** An orc RecordReaderImpl. */
+public class RecordReaderImpl implements RecordReader {
+ static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
+ private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
+ // as public for use with test cases
+ public static final OrcProto.ColumnStatistics EMPTY_COLUMN_STATISTICS =
+ OrcProto.ColumnStatistics.newBuilder()
+ .setNumberOfValues(0)
+ .setHasNull(false)
+ .setBytesOnDisk(0)
+ .build();
+ protected final Path path;
+ private final long firstRow;
+ private final List<StripeInformation> stripes = new ArrayList<>();
+ private OrcProto.StripeFooter stripeFooter;
+ private final long totalRowCount;
+ protected final TypeDescription schema;
+ // the file included columns indexed by the file's column ids.
+ private final boolean[] fileIncluded;
+ private final long rowIndexStride;
+ private long rowInStripe = 0;
+ // position of the follow reader within the stripe
+ private long followRowInStripe = 0;
+ private int currentStripe = -1;
+ private long rowBaseInStripe = 0;
+ private long rowCountInStripe = 0;
+ private final BatchReader reader;
+ private final OrcIndex indexes;
+ // identifies the columns requiring row indexes
+ private final boolean[] rowIndexColsToRead;
+ private final SargApplier sargApp;
+ // an array about which row groups aren't skipped
+ private boolean[] includedRowGroups = null;
+ private final DataReader dataReader;
+ private final int maxDiskRangeChunkLimit;
+ private final StripePlanner planner;
+ // identifies the type of read, ALL(read everything), LEADERS(read only
the filter columns)
+ private final TypeReader.ReadPhase startReadPhase;
+ // identifies that follow columns bytes must be read
+ private boolean needsFollowColumnsRead;
+ private final boolean noSelectedVector;
+ // identifies whether the file has bad bloom filters that we should not
use.
+ private final boolean skipBloomFilters;
+ private final FileIndexResult fileIndexResult;
+ static final String[] BAD_CPP_BLOOM_FILTER_VERSIONS = {
+ "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6",
"1.6.7", "1.6.8", "1.6.9",
+ "1.6.10", "1.6.11", "1.7.0"
+ };
+
+ /**
+ * Given a list of column names, find the given column and return the
index.
+ *
+ * @param evolution the mapping from reader to file schema
+ * @param columnName the fully qualified column name to look for
+ * @return the file column number or -1 if the column wasn't found in the
file schema
+ * @throws IllegalArgumentException if the column was not found in the
reader schema
+ */
+ static int findColumns(SchemaEvolution evolution, String columnName) {
+ TypeDescription fileColumn = findColumnType(evolution, columnName);
+ return fileColumn == null ? -1 : fileColumn.getId();
+ }
+
+ static TypeDescription findColumnType(SchemaEvolution evolution, String
columnName) {
+ try {
+ TypeDescription readerColumn =
+ evolution
+ .getReaderBaseSchema()
+ .findSubtype(columnName,
evolution.isSchemaEvolutionCaseAware);
+ return evolution.getFileType(readerColumn);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Filter could not find column with name: "
+ + columnName
+ + " on "
+ + evolution.getReaderBaseSchema(),
+ e);
+ }
+ }
+
+ /**
+ * Given a column name such as 'a.b.c', this method returns the column
'a.b.c' if present in the
+ * file. In case 'a.b.c' is not found in file then it tries to look for
'a.b', then 'a'. If none
+ * are present then it shall return null.
+ *
+ * @param evolution the mapping from reader to file schema
+ * @param columnName the fully qualified column name to look for
+ * @return the file column type or null in case none of the branch columns
are present in the
+ * file
+ * @throws IllegalArgumentException if the column was not found in the
reader schema
+ */
+ static TypeDescription findMostCommonColumn(SchemaEvolution evolution,
String columnName) {
+ try {
+ TypeDescription readerColumn =
+ evolution
+ .getReaderBaseSchema()
+ .findSubtype(columnName,
evolution.isSchemaEvolutionCaseAware);
+ TypeDescription fileColumn;
+ do {
+ fileColumn = evolution.getFileType(readerColumn);
+ if (fileColumn == null) {
+ readerColumn = readerColumn.getParent();
+ } else {
+ return fileColumn;
+ }
+ } while (readerColumn != null);
+ return null;
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Filter could not find column with name: "
+ + columnName
+ + " on "
+ + evolution.getReaderBaseSchema(),
+ e);
+ }
+ }
+
+ /**
+ * Find the mapping from predicate leaves to columns.
+ *
+ * @param sargLeaves the search argument that we need to map
+ * @param evolution the mapping from reader to file schema
+ * @return an array mapping the sarg leaves to concrete column numbers in
the file
+ */
+ public static int[] mapSargColumnsToOrcInternalColIdx(
+ List<PredicateLeaf> sargLeaves, SchemaEvolution evolution) {
+ int[] result = new int[sargLeaves.size()];
+ for (int i = 0; i < sargLeaves.size(); ++i) {
+ int colNum = -1;
+ try {
+ String colName = sargLeaves.get(i).getColumnName();
+ colNum = findColumns(evolution, colName);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("{}", e.getMessage());
+ }
+ result[i] = colNum;
+ }
+ return result;
+ }
+
+ public RecordReaderImpl(
+ ReaderImpl fileReader, Reader.Options options, FileIndexResult
fileIndexResult)
+ throws IOException {
+ this.fileIndexResult = fileIndexResult;
+ OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion();
+ SchemaEvolution evolution;
+ if (options.getSchema() == null) {
+ LOG.info("Reader schema not provided -- using file schema " +
fileReader.getSchema());
+ evolution = new SchemaEvolution(fileReader.getSchema(), null,
options);
+ } else {
+
+ // Now that we are creating a record reader for a file, validate
that
+ // the schema to read is compatible with the file schema.
+ //
+ evolution = new SchemaEvolution(fileReader.getSchema(),
options.getSchema(), options);
+ if (LOG.isDebugEnabled() && evolution.hasConversion()) {
+ LOG.debug(
+ "ORC file "
+ + fileReader.path.toString()
+ + " has data type conversion --\n"
+ + "reader schema: "
+ + options.getSchema().toString()
+ + "\n"
+ + "file schema: "
+ + fileReader.getSchema());
+ }
+ }
+ this.noSelectedVector = !options.useSelected();
+ LOG.debug("noSelectedVector={}", this.noSelectedVector);
+ this.schema = evolution.getReaderSchema();
+ this.path = fileReader.path;
+ this.rowIndexStride = fileReader.rowIndexStride;
+ boolean ignoreNonUtf8BloomFilter =
+
OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
+ ReaderEncryption encryption = fileReader.getEncryption();
+ this.fileIncluded = evolution.getFileIncluded();
+ SearchArgument sarg = options.getSearchArgument();
+ boolean[] rowIndexCols = new
boolean[evolution.getFileIncluded().length];
+ if (sarg != null && rowIndexStride > 0) {
+ sargApp =
+ new SargApplier(
+ sarg,
+ rowIndexStride,
+ evolution,
+ writerVersion,
+ fileReader.useUTCTimestamp,
+ fileReader.writerUsedProlepticGregorian(),
+
fileReader.options.getConvertToProlepticGregorian());
+ sargApp.setRowIndexCols(rowIndexCols);
+ } else {
+ sargApp = null;
+ }
+
+ long rows = 0;
+ long skippedRows = 0;
+ long offset = options.getOffset();
+ long maxOffset = options.getMaxOffset();
+ for (StripeInformation stripe : fileReader.getStripes()) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) {
+ skippedRows += stripe.getNumberOfRows();
+ } else if (stripeStart < maxOffset) {
+ this.stripes.add(stripe);
+ rows += stripe.getNumberOfRows();
+ }
+ }
+ this.maxDiskRangeChunkLimit =
+ OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(fileReader.conf);
+ Boolean zeroCopy = options.getUseZeroCopy();
+ if (zeroCopy == null) {
+ zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+ }
+ if (options.getDataReader() != null) {
+ this.dataReader = options.getDataReader().clone();
+ } else {
+ InStream.StreamOptions unencryptedOptions =
+ InStream.options()
+
.withCodec(OrcCodecPool.getCodec(fileReader.getCompressionKind()))
+ .withBufferSize(fileReader.getCompressionSize());
+ DataReaderProperties.Builder builder =
+ DataReaderProperties.builder()
+ .withCompression(unencryptedOptions)
+
.withFileSystemSupplier(fileReader.getFileSystemSupplier())
+ .withPath(fileReader.path)
+ .withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
+ .withZeroCopy(zeroCopy)
+ .withMinSeekSize(options.minSeekSize())
+
.withMinSeekSizeTolerance(options.minSeekSizeTolerance());
+ FSDataInputStream file = fileReader.takeFile();
+ if (file != null) {
+ builder.withFile(file);
+ }
+ this.dataReader =
RecordReaderUtils.createDefaultDataReader(builder.build());
+ }
+ firstRow = skippedRows;
+ totalRowCount = rows;
+ Boolean skipCorrupt = options.getSkipCorruptRecords();
+ if (skipCorrupt == null) {
+ skipCorrupt =
OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
+ }
+
+ String[] filterCols = null;
+ Consumer<OrcFilterContext> filterCallBack = null;
+ String filePath =
+ options.allowPluginFilters()
+ ?
fileReader.getFileSystem().makeQualified(fileReader.path).toString()
+ : null;
+ BatchFilter filter =
+ FilterFactory.createBatchFilter(
+ options,
+ evolution.getReaderBaseSchema(),
+ evolution.isSchemaEvolutionCaseAware(),
+ fileReader.getFileVersion(),
+ false,
+ filePath,
+ fileReader.conf);
+ if (filter != null) {
+ // If a filter is determined then use this
+ filterCallBack = filter;
+ filterCols = filter.getColumnNames();
+ }
+
+ // Map columnNames to ColumnIds
+ SortedSet<Integer> filterColIds = new TreeSet<>();
+ if (filterCols != null) {
+ for (String colName : filterCols) {
+ TypeDescription expandCol = findColumnType(evolution, colName);
+ // If the column is not present in the file then this can be
ignored from read.
+ if (expandCol == null || expandCol.getId() == -1) {
+ // Add -1 to filter columns so that the NullTreeReader is
invoked during the
+ // LEADERS phase
+ filterColIds.add(-1);
+ // Determine the common parent and include these
+ expandCol = findMostCommonColumn(evolution, colName);
+ }
+ while (expandCol != null && expandCol.getId() != -1) {
+ // classify the column and the parent branch as LEAD
+ filterColIds.add(expandCol.getId());
+ rowIndexCols[expandCol.getId()] = true;
+ expandCol = expandCol.getParent();
+ }
+ }
+ this.startReadPhase = TypeReader.ReadPhase.LEADERS;
+ LOG.debug(
+ "Using startReadPhase: {} with filter columns: {}",
+ startReadPhase,
+ filterColIds);
+ } else {
+ this.startReadPhase = TypeReader.ReadPhase.ALL;
+ }
+
+ this.rowIndexColsToRead = ArrayUtils.contains(rowIndexCols, true) ?
rowIndexCols : null;
+ TreeReaderFactory.ReaderContext readerContext =
+ new TreeReaderFactory.ReaderContext()
+ .setSchemaEvolution(evolution)
+ .setFilterCallback(filterColIds, filterCallBack)
+ .skipCorrupt(skipCorrupt)
+ .fileFormat(fileReader.getFileVersion())
+ .useUTCTimestamp(fileReader.useUTCTimestamp)
+ .setProlepticGregorian(
+ fileReader.writerUsedProlepticGregorian(),
+
fileReader.options.getConvertToProlepticGregorian())
+ .setEncryption(encryption);
+ reader =
TreeReaderFactory.createRootReader(evolution.getReaderSchema(), readerContext);
+ skipBloomFilters =
hasBadBloomFilters(fileReader.getFileTail().getFooter());
+
+ int columns = evolution.getFileSchema().getMaximumId() + 1;
+ indexes =
+ new OrcIndex(
+ new OrcProto.RowIndex[columns],
+ new OrcProto.Stream.Kind[columns],
+ new OrcProto.BloomFilterIndex[columns]);
+
+ planner =
+ new StripePlanner(
+ evolution.getFileSchema(),
+ encryption,
+ dataReader,
+ writerVersion,
+ ignoreNonUtf8BloomFilter,
+ maxDiskRangeChunkLimit,
+ filterColIds);
+
+ try {
+ advanceToNextRow(reader, 0L, true);
+ } catch (Exception e) {
+ // Try to close since this happens in constructor.
+ close();
+ long stripeId = stripes.size() == 0 ? 0 :
stripes.get(0).getStripeId();
+ throw new IOException(
+ String.format("Problem opening stripe %d footer in %s.",
stripeId, path), e);
+ }
+ }
+
+ /**
+ * Check if the file has inconsistent bloom filters. We will skip using
them in the following
+ * reads.
+ *
+ * @return true if it has.
+ */
+ private boolean hasBadBloomFilters(OrcProto.Footer footer) {
+ // Only C++ writer in old releases could have bad bloom filters.
+ if (footer.getWriter() != 1) {
+ return false;
+ }
+ // 'softwareVersion' is added in 1.5.13, 1.6.11, and 1.7.0.
+ // 1.6.x releases before 1.6.11 won't have it. On the other side, the
C++ writer
+ // supports writing bloom filters since 1.6.0. So files written by the
C++ writer
+ // and with 'softwareVersion' unset would have bad bloom filters.
+ if (!footer.hasSoftwareVersion()) {
+ return true;
+ }
+ String fullVersion = footer.getSoftwareVersion();
+ String version = fullVersion;
+ // Deal with snapshot versions, e.g. 1.6.12-SNAPSHOT.
+ if (fullVersion.contains("-")) {
+ version = fullVersion.substring(0, fullVersion.indexOf('-'));
+ }
+ for (String v : BAD_CPP_BLOOM_FILTER_VERSIONS) {
+ if (v.equals(version)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** An orc PositionProvider impl. */
+ public static final class PositionProviderImpl implements PositionProvider
{
+ private final OrcProto.RowIndexEntry entry;
+ private int index;
+
+ public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+ this(entry, 0);
+ }
+
+ public PositionProviderImpl(OrcProto.RowIndexEntry entry, int
startPos) {
+ this.entry = entry;
+ this.index = startPos;
+ }
+
+ @Override
+ public long getNext() {
+ return entry.getPositions(index++);
+ }
+ }
+
+ /** An orc PositionProvider impl. */
+ public static final class ZeroPositionProvider implements PositionProvider
{
+ @Override
+ public long getNext() {
+ return 0;
+ }
+ }
+
+ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe)
throws IOException {
+ return dataReader.readStripeFooter(stripe);
+ }
+
+ enum Location {
+ BEFORE,
+ MIN,
+ MIDDLE,
+ MAX,
+ AFTER
+ }
+
+ static class ValueRange<T extends Comparable> {
+ final Comparable lower;
+ final Comparable upper;
+ final boolean onlyLowerBound;
+ final boolean onlyUpperBound;
+ final boolean hasNulls;
+ final boolean hasValue;
+ final boolean comparable;
+
+ ValueRange(
+ PredicateLeaf predicate,
+ T lower,
+ T upper,
+ boolean hasNulls,
+ boolean onlyLowerBound,
+ boolean onlyUpperBound,
+ boolean hasValue,
+ boolean comparable) {
+ PredicateLeaf.Type type = predicate.getType();
+ this.lower = getBaseObjectForComparison(type, lower);
+ this.upper = getBaseObjectForComparison(type, upper);
+ this.hasNulls = hasNulls;
+ this.onlyLowerBound = onlyLowerBound;
+ this.onlyUpperBound = onlyUpperBound;
+ this.hasValue = hasValue;
+ this.comparable = comparable;
+ }
+
+ ValueRange(
+ PredicateLeaf predicate,
+ T lower,
+ T upper,
+ boolean hasNulls,
+ boolean onlyLowerBound,
+ boolean onlyUpperBound) {
+ this(
+ predicate,
+ lower,
+ upper,
+ hasNulls,
+ onlyLowerBound,
+ onlyUpperBound,
+ lower != null,
+ lower != null);
+ }
+
+ ValueRange(PredicateLeaf predicate, T lower, T upper, boolean
hasNulls) {
+ this(predicate, lower, upper, hasNulls, false, false);
+ }
+
+ /**
+ * A value range where the data is either missing or all null.
+ *
+ * @param predicate the predicate to test
+ * @param hasNulls whether there are nulls
+ */
+ ValueRange(PredicateLeaf predicate, boolean hasNulls) {
+ this(predicate, null, null, hasNulls, false, false);
+ }
+
+ boolean hasValues() {
+ return hasValue;
+ }
+
+ /**
+ * Whether min or max is provided for comparison.
+ *
+ * @return is it comparable
+ */
+ boolean isComparable() {
+ return hasValue && comparable;
+ }
+
+ /**
+ * value range is invalid if the column statistics are non-existent.
+ *
+ * @see ColumnStatisticsImpl#isStatsExists() this method is similar to
isStatsExists
+ * @return value range is valid or not
+ */
+ boolean isValid() {
+ return hasValue || hasNulls;
+ }
+
+ /**
+ * Given a point and min and max, determine if the point is before, at
the min, in the
+ * middle, at the max, or after the range.
+ *
+ * @param point the point to test
+ * @return the location of the point
+ */
+ Location compare(Comparable point) {
+ int minCompare = point.compareTo(lower);
+ if (minCompare < 0) {
+ return Location.BEFORE;
+ } else if (minCompare == 0) {
+ return onlyLowerBound ? Location.BEFORE : Location.MIN;
+ }
+ int maxCompare = point.compareTo(upper);
+ if (maxCompare > 0) {
+ return Location.AFTER;
+ } else if (maxCompare == 0) {
+ return onlyUpperBound ? Location.AFTER : Location.MAX;
+ }
+ return Location.MIDDLE;
+ }
+
+ /**
+ * Is this range a single point?
+ *
+ * @return true if min == max
+ */
+ boolean isSingleton() {
+ return lower != null && !onlyLowerBound && !onlyUpperBound &&
lower.equals(upper);
+ }
+
+ /**
+ * Add the null option to the truth value, if the range includes nulls.
+ *
+ * @param value the original truth value
+ * @return the truth value extended with null if appropriate
+ */
+ SearchArgument.TruthValue addNull(SearchArgument.TruthValue value) {
+ if (hasNulls) {
+ switch (value) {
+ case YES:
+ return SearchArgument.TruthValue.YES_NULL;
+ case NO:
+ return SearchArgument.TruthValue.NO_NULL;
+ case YES_NO:
+ return SearchArgument.TruthValue.YES_NO_NULL;
+ default:
+ return value;
+ }
+ } else {
+ return value;
+ }
+ }
+ }
+
+ /**
+ * Get the maximum value out of an index entry. Includes option to specify
if timestamp column
+ * stats values should be in UTC.
+ *
+ * @param index the index entry
+ * @param predicate the kind of predicate
+ * @param useUTCTimestamp use UTC for timestamps
+ * @return the object for the maximum value or null if there isn't one
+ */
+ static ValueRange getValueRange(
+ ColumnStatistics index, PredicateLeaf predicate, boolean
useUTCTimestamp) {
+ if (index.getNumberOfValues() == 0) {
+ return new ValueRange<>(predicate, index.hasNull());
+ } else if (index instanceof IntegerColumnStatistics) {
+ IntegerColumnStatistics stats = (IntegerColumnStatistics) index;
+ Long min = stats.getMinimum();
+ Long max = stats.getMaximum();
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else if (index instanceof CollectionColumnStatistics) {
+ CollectionColumnStatistics stats = (CollectionColumnStatistics)
index;
+ Long min = stats.getMinimumChildren();
+ Long max = stats.getMaximumChildren();
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else if (index instanceof DoubleColumnStatistics) {
+ DoubleColumnStatistics stats = (DoubleColumnStatistics) index;
+ Double min = stats.getMinimum();
+ Double max = stats.getMaximum();
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else if (index instanceof StringColumnStatistics) {
+ StringColumnStatistics stats = (StringColumnStatistics) index;
+ return new ValueRange<>(
+ predicate,
+ stats.getLowerBound(),
+ stats.getUpperBound(),
+ stats.hasNull(),
+ stats.getMinimum() == null,
+ stats.getMaximum() == null);
+ } else if (index instanceof DateColumnStatistics) {
+ DateColumnStatistics stats = (DateColumnStatistics) index;
+ ChronoLocalDate min = stats.getMinimumLocalDate();
+ ChronoLocalDate max = stats.getMaximumLocalDate();
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else if (index instanceof DecimalColumnStatistics) {
+ DecimalColumnStatistics stats = (DecimalColumnStatistics) index;
+ HiveDecimal min = stats.getMinimum();
+ HiveDecimal max = stats.getMaximum();
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else if (index instanceof TimestampColumnStatistics) {
+ TimestampColumnStatistics stats = (TimestampColumnStatistics)
index;
+ Timestamp min = useUTCTimestamp ? stats.getMinimumUTC() :
stats.getMinimum();
+ Timestamp max = useUTCTimestamp ? stats.getMaximumUTC() :
stats.getMaximum();
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else if (index instanceof BooleanColumnStatistics) {
+ BooleanColumnStatistics stats = (BooleanColumnStatistics) index;
+ Boolean min = stats.getFalseCount() == 0;
+ Boolean max = stats.getTrueCount() != 0;
+ return new ValueRange<>(predicate, min, max, stats.hasNull());
+ } else {
+ return new ValueRange(
+ predicate, null, null, index.hasNull(), false, false,
true, false);
+ }
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
that is referenced in the
+ * predicate.
+ *
+ * @param statsProto the statistics for the column mentioned in the
predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @param bloomFilter the bloom filter
+ * @param writerVersion the version of software that wrote the file
+ * @param type what is the kind of this column
+ * @return the set of truth values that may be returned for the given
predicate.
+ */
+ static SearchArgument.TruthValue evaluatePredicateProto(
+ OrcProto.ColumnStatistics statsProto,
+ PredicateLeaf predicate,
+ OrcProto.Stream.Kind kind,
+ OrcProto.ColumnEncoding encoding,
+ OrcProto.BloomFilter bloomFilter,
+ OrcFile.WriterVersion writerVersion,
+ TypeDescription type) {
+ return evaluatePredicateProto(
+ statsProto,
+ predicate,
+ kind,
+ encoding,
+ bloomFilter,
+ writerVersion,
+ type,
+ true,
+ false);
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
that is referenced in the
+ * predicate. Includes option to specify if timestamp column stats values
should be in UTC and
+ * if the file writer used proleptic Gregorian calendar.
+ *
+ * @param statsProto the statistics for the column mentioned in the
predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @param bloomFilter the bloom filter
+ * @param writerVersion the version of software that wrote the file
+ * @param type what is the kind of this column
+ * @param writerUsedProlepticGregorian file written using the proleptic
Gregorian calendar
+ * @param useUTCTimestamp
+ * @return the set of truth values that may be returned for the given
predicate.
+ */
+ static SearchArgument.TruthValue evaluatePredicateProto(
+ OrcProto.ColumnStatistics statsProto,
+ PredicateLeaf predicate,
+ OrcProto.Stream.Kind kind,
+ OrcProto.ColumnEncoding encoding,
+ OrcProto.BloomFilter bloomFilter,
+ OrcFile.WriterVersion writerVersion,
+ TypeDescription type,
+ boolean writerUsedProlepticGregorian,
+ boolean useUTCTimestamp) {
+ ColumnStatistics cs =
+ ColumnStatisticsImpl.deserialize(
+ null, statsProto, writerUsedProlepticGregorian, true);
+ ValueRange range = getValueRange(cs, predicate, useUTCTimestamp);
+
+ // files written before ORC-135 stores timestamp wrt to local timezone
causing issues with
+ // PPD.
+ // disable PPD for timestamp for all old files
+ TypeDescription.Category category = type.getCategory();
+ if (category == TypeDescription.Category.TIMESTAMP) {
+ if (!writerVersion.includes(OrcFile.WriterVersion.ORC_135)) {
+ LOG.debug(
+ "Not using predication pushdown on {} because it
doesn't "
+ + "include ORC-135. Writer version: {}",
+ predicate.getColumnName(),
+ writerVersion);
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ if (predicate.getType() != PredicateLeaf.Type.TIMESTAMP
+ && predicate.getType() != PredicateLeaf.Type.DATE
+ && predicate.getType() != PredicateLeaf.Type.STRING) {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ } else if (writerVersion == OrcFile.WriterVersion.ORC_135
+ && category == TypeDescription.Category.DECIMAL
+ && type.getPrecision() <=
TypeDescription.MAX_DECIMAL64_PRECISION) {
+ // ORC 1.5.0 to 1.5.5, which use WriterVersion.ORC_135, have broken
+ // min and max values for decimal64. See ORC-517.
+ LOG.debug(
+ "Not using predicate push down on {}, because the file
doesn't"
+ + " include ORC-517. Writer version: {}",
+ predicate.getColumnName(),
+ writerVersion);
+ return SearchArgument.TruthValue.YES_NO_NULL;
+ } else if ((category == TypeDescription.Category.DOUBLE
+ || category == TypeDescription.Category.FLOAT)
+ && cs instanceof DoubleColumnStatistics) {
+ DoubleColumnStatistics dstas = (DoubleColumnStatistics) cs;
+ if (Double.isNaN(dstas.getSum())) {
+ LOG.debug(
+ "Not using predication pushdown on {} because stats
contain NaN values",
+ predicate.getColumnName());
+ return dstas.hasNull()
+ ? SearchArgument.TruthValue.YES_NO_NULL
+ : SearchArgument.TruthValue.YES_NO;
+ }
+ }
+ return evaluatePredicateRange(
+ predicate,
+ range,
+ BloomFilterIO.deserialize(
+ kind, encoding, writerVersion, type.getCategory(),
bloomFilter),
+ useUTCTimestamp);
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
that is referenced in the
+ * predicate.
+ *
+ * @param stats the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @return the set of truth values that may be returned for the given
predicate.
+ */
+ public static SearchArgument.TruthValue evaluatePredicate(
+ ColumnStatistics stats, PredicateLeaf predicate, BloomFilter
bloomFilter) {
+ return evaluatePredicate(stats, predicate, bloomFilter, false);
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
that is referenced in the
+ * predicate. Includes option to specify if timestamp column stats values
should be in UTC.
+ *
+ * @param stats the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @param bloomFilter
+ * @param useUTCTimestamp
+ * @return the set of truth values that may be returned for the given
predicate.
+ */
+ public static SearchArgument.TruthValue evaluatePredicate(
+ ColumnStatistics stats,
+ PredicateLeaf predicate,
+ BloomFilter bloomFilter,
+ boolean useUTCTimestamp) {
+ ValueRange range = getValueRange(stats, predicate, useUTCTimestamp);
+
+ return evaluatePredicateRange(predicate, range, bloomFilter,
useUTCTimestamp);
+ }
+
+ static SearchArgument.TruthValue evaluatePredicateRange(
+ PredicateLeaf predicate,
+ ValueRange range,
+ BloomFilter bloomFilter,
+ boolean useUTCTimestamp) {
+ if (!range.isValid()) {
+ return SearchArgument.TruthValue.YES_NO_NULL;
+ }
+
+ // if we didn't have any values, everything must have been null
+ if (!range.hasValues()) {
+ if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+ return SearchArgument.TruthValue.YES;
+ } else {
+ return SearchArgument.TruthValue.NULL;
+ }
+ } else if (!range.isComparable()) {
+ return range.hasNulls
+ ? SearchArgument.TruthValue.YES_NO_NULL
+ : SearchArgument.TruthValue.YES_NO;
+ }
+
+ SearchArgument.TruthValue result;
+ Comparable baseObj = (Comparable) predicate.getLiteral();
+ // Predicate object and stats objects are converted to the type of the
predicate object.
+ Comparable predObj = getBaseObjectForComparison(predicate.getType(),
baseObj);
+
+ result = evaluatePredicateMinMax(predicate, predObj, range);
+ if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
+ return evaluatePredicateBloomFilter(
+ predicate, predObj, bloomFilter, range.hasNulls,
useUTCTimestamp);
+ } else {
+ return result;
+ }
+ }
+
+ private static boolean shouldEvaluateBloomFilter(
+ PredicateLeaf predicate, SearchArgument.TruthValue result,
BloomFilter bloomFilter) {
+ // evaluate bloom filter only when
+ // 1) Bloom filter is available
+ // 2) Min/Max evaluation yield YES or MAYBE
+ // 3) Predicate is EQUALS or IN list
+ return bloomFilter != null
+ && result != SearchArgument.TruthValue.NO_NULL
+ && result != SearchArgument.TruthValue.NO
+ &&
(predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
+ ||
predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ ||
predicate.getOperator().equals(PredicateLeaf.Operator.IN));
+ }
+
+ private static SearchArgument.TruthValue evaluatePredicateMinMax(
+ PredicateLeaf predicate, Comparable predObj, ValueRange range) {
+ Location loc;
+
+ switch (predicate.getOperator()) {
+ case NULL_SAFE_EQUALS:
+ loc = range.compare(predObj);
+ if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return SearchArgument.TruthValue.NO;
+ } else {
+ return SearchArgument.TruthValue.YES_NO;
+ }
+ case EQUALS:
+ loc = range.compare(predObj);
+ if (range.isSingleton() && loc == Location.MIN) {
+ return range.addNull(SearchArgument.TruthValue.YES);
+ } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return range.addNull(SearchArgument.TruthValue.NO);
+ } else {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ case LESS_THAN:
+ loc = range.compare(predObj);
+ if (loc == Location.AFTER) {
+ return range.addNull(SearchArgument.TruthValue.YES);
+ } else if (loc == Location.BEFORE || loc == Location.MIN) {
+ return range.addNull(SearchArgument.TruthValue.NO);
+ } else {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ case LESS_THAN_EQUALS:
+ loc = range.compare(predObj);
+ if (loc == Location.AFTER
+ || loc == Location.MAX
+ || (loc == Location.MIN && range.isSingleton())) {
+ return range.addNull(SearchArgument.TruthValue.YES);
+ } else if (loc == Location.BEFORE) {
+ return range.addNull(SearchArgument.TruthValue.NO);
+ } else {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ case IN:
+ if (range.isSingleton()) {
+ // for a single value, look through to see if that value
is in the
+ // set
+ for (Object arg : predicate.getLiteralList()) {
+ predObj =
getBaseObjectForComparison(predicate.getType(), (Comparable) arg);
+ if (range.compare(predObj) == Location.MIN) {
+ return
range.addNull(SearchArgument.TruthValue.YES);
+ }
+ }
+ return range.addNull(SearchArgument.TruthValue.NO);
+ } else {
+ // are all of the values outside of the range?
+ for (Object arg : predicate.getLiteralList()) {
+ predObj =
getBaseObjectForComparison(predicate.getType(), (Comparable) arg);
+ loc = range.compare(predObj);
+ if (loc == Location.MIN || loc == Location.MIDDLE ||
loc == Location.MAX) {
+ return
range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ }
+ return range.addNull(SearchArgument.TruthValue.NO);
+ }
+ case BETWEEN:
+ List<Object> args = predicate.getLiteralList();
+ if (args == null || args.isEmpty()) {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ Comparable predObj1 =
+ getBaseObjectForComparison(predicate.getType(),
(Comparable) args.get(0));
+
+ loc = range.compare(predObj1);
+ if (loc == Location.BEFORE || loc == Location.MIN) {
+ Comparable predObj2 =
+ getBaseObjectForComparison(
+ predicate.getType(), (Comparable)
args.get(1));
+ Location loc2 = range.compare(predObj2);
+ if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+ return range.addNull(SearchArgument.TruthValue.YES);
+ } else if (loc2 == Location.BEFORE) {
+ return range.addNull(SearchArgument.TruthValue.NO);
+ } else {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ } else if (loc == Location.AFTER) {
+ return range.addNull(SearchArgument.TruthValue.NO);
+ } else {
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ case IS_NULL:
+ // min = null condition above handles the all-nulls YES case
+ return range.hasNulls
+ ? SearchArgument.TruthValue.YES_NO
+ : SearchArgument.TruthValue.NO;
+ default:
+ return range.addNull(SearchArgument.TruthValue.YES_NO);
+ }
+ }
+
+ private static SearchArgument.TruthValue evaluatePredicateBloomFilter(
+ PredicateLeaf predicate,
+ final Object predObj,
+ BloomFilter bloomFilter,
+ boolean hasNull,
+ boolean useUTCTimestamp) {
+ switch (predicate.getOperator()) {
+ case NULL_SAFE_EQUALS:
+ // null safe equals does not return *_NULL variant. So set
hasNull to false
+ return checkInBloomFilter(bloomFilter, predObj, false,
useUTCTimestamp);
+ case EQUALS:
+ return checkInBloomFilter(bloomFilter, predObj, hasNull,
useUTCTimestamp);
+ case IN:
+ for (Object arg : predicate.getLiteralList()) {
+ // if atleast one value in IN list exist in bloom filter,
qualify the row
+ // group/stripe
+ Object predObjItem =
+ getBaseObjectForComparison(predicate.getType(),
(Comparable) arg);
+ SearchArgument.TruthValue result =
+ checkInBloomFilter(bloomFilter, predObjItem,
hasNull, useUTCTimestamp);
+ if (result == SearchArgument.TruthValue.YES_NO_NULL
+ || result == SearchArgument.TruthValue.YES_NO) {
+ return result;
+ }
+ }
+ return hasNull ? SearchArgument.TruthValue.NO_NULL :
SearchArgument.TruthValue.NO;
+ default:
+ return hasNull
+ ? SearchArgument.TruthValue.YES_NO_NULL
+ : SearchArgument.TruthValue.YES_NO;
+ }
+ }
+
+ private static SearchArgument.TruthValue checkInBloomFilter(
+ BloomFilter bf, Object predObj, boolean hasNull, boolean
useUTCTimestamp) {
+ SearchArgument.TruthValue result =
+ hasNull ? SearchArgument.TruthValue.NO_NULL :
SearchArgument.TruthValue.NO;
+
+ if (predObj instanceof Long) {
+ if (bf.testLong((Long) predObj)) {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof Double) {
+ if (bf.testDouble((Double) predObj)) {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof String
+ || predObj instanceof Text
+ || predObj instanceof HiveDecimalWritable
+ || predObj instanceof BigDecimal) {
+ if (bf.testString(predObj.toString())) {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ } else if (predObj instanceof Timestamp) {
+ if (useUTCTimestamp) {
+ if (bf.testLong(((Timestamp) predObj).getTime())) {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ } else {
+ if (bf.testLong(
+ SerializationUtils.convertToUtc(
+ TimeZone.getDefault(), ((Timestamp)
predObj).getTime()))) {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ }
+ } else if (predObj instanceof ChronoLocalDate) {
+ if (bf.testLong(((ChronoLocalDate) predObj).toEpochDay())) {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ } else {
+ // if the predicate object is null and if hasNull says there are
no nulls then return NO
+ if (predObj == null && !hasNull) {
+ result = SearchArgument.TruthValue.NO;
+ } else {
+ result = SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ }
+
+ if (result == SearchArgument.TruthValue.YES_NO_NULL && !hasNull) {
+ result = SearchArgument.TruthValue.YES_NO;
+ }
+
+ LOG.debug("Bloom filter evaluation: {}", result);
+
+ return result;
+ }
+
+ /** An exception for when we can't cast things appropriately. */
+ static class SargCastException extends IllegalArgumentException {
+
+ SargCastException(String string) {
+ super(string);
+ }
+ }
+
+ private static Comparable getBaseObjectForComparison(PredicateLeaf.Type
type, Comparable obj) {
+ if (obj == null) {
+ return null;
+ }
+ switch (type) {
+ case BOOLEAN:
+ if (obj instanceof Boolean) {
+ return obj;
+ } else {
+ // will only be true if the string conversion yields
"true", all other values
+ // are
+ // considered false
+ return Boolean.valueOf(obj.toString());
+ }
+ case DATE:
+ if (obj instanceof ChronoLocalDate) {
+ return obj;
+ } else if (obj instanceof java.sql.Date) {
+ return ((java.sql.Date) obj).toLocalDate();
+ } else if (obj instanceof Date) {
+ return LocalDateTime.ofInstant(((Date) obj).toInstant(),
ZoneOffset.UTC)
+ .toLocalDate();
+ } else if (obj instanceof String) {
+ return LocalDate.parse((String) obj);
+ } else if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toLocalDateTime().toLocalDate();
+ }
+ // always string, but prevent the comparison to numbers (are
they
+ // days/seconds/milliseconds?)
+ break;
+ case DECIMAL:
+ if (obj instanceof Boolean) {
+ return new HiveDecimalWritable(
+ (Boolean) obj ? HiveDecimal.ONE :
HiveDecimal.ZERO);
+ } else if (obj instanceof Integer) {
+ return new HiveDecimalWritable((Integer) obj);
+ } else if (obj instanceof Long) {
+ return new HiveDecimalWritable(((Long) obj));
+ } else if (obj instanceof Float || obj instanceof Double ||
obj instanceof String) {
+ return new HiveDecimalWritable(obj.toString());
+ } else if (obj instanceof BigDecimal) {
+ return new
HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
+ } else if (obj instanceof HiveDecimal) {
+ return new HiveDecimalWritable((HiveDecimal) obj);
+ } else if (obj instanceof HiveDecimalWritable) {
+ return obj;
+ } else if (obj instanceof Timestamp) {
+ return new HiveDecimalWritable(
+
Double.toString(TimestampUtils.getDouble((Timestamp) obj)));
+ }
+ break;
+ case FLOAT:
+ if (obj instanceof Number) {
+ // widening conversion
+ return ((Number) obj).doubleValue();
+ } else if (obj instanceof HiveDecimal) {
+ return ((HiveDecimal) obj).doubleValue();
+ } else if (obj instanceof String) {
+ return Double.valueOf(obj.toString());
+ } else if (obj instanceof Timestamp) {
+ return TimestampUtils.getDouble((Timestamp) obj);
+ }
+ break;
+ case LONG:
+ if (obj instanceof Number) {
+ // widening conversion
+ return ((Number) obj).longValue();
+ } else if (obj instanceof HiveDecimal) {
+ return ((HiveDecimal) obj).longValue();
+ } else if (obj instanceof String) {
+ return Long.valueOf(obj.toString());
+ }
+ break;
+ case STRING:
+ if (obj instanceof ChronoLocalDate) {
+ ChronoLocalDate date = (ChronoLocalDate) obj;
+ return date.format(
+
DateTimeFormatter.ISO_LOCAL_DATE.withChronology(date.getChronology()));
+ }
+ return (obj.toString());
+ case TIMESTAMP:
+ if (obj instanceof Timestamp) {
+ return obj;
+ } else if (obj instanceof Integer) {
+ return new Timestamp(((Number) obj).longValue());
+ } else if (obj instanceof Float) {
+ return TimestampUtils.doubleToTimestamp(((Float)
obj).doubleValue());
+ } else if (obj instanceof Double) {
+ return TimestampUtils.doubleToTimestamp((Double) obj);
+ } else if (obj instanceof HiveDecimal) {
+ return TimestampUtils.decimalToTimestamp((HiveDecimal)
obj);
+ } else if (obj instanceof HiveDecimalWritable) {
+ return TimestampUtils.decimalToTimestamp(
+ ((HiveDecimalWritable) obj).getHiveDecimal());
+ } else if (obj instanceof Date) {
+ return new Timestamp(((Date) obj).getTime());
+ } else if (obj instanceof ChronoLocalDate) {
+ return new Timestamp(
+ ((ChronoLocalDate) obj)
+ .atTime(LocalTime.MIDNIGHT)
+ .toInstant(ZoneOffset.UTC)
+ .getEpochSecond()
+ * 1000L);
+ }
+ // float/double conversion to timestamp is interpreted as
seconds whereas integer
+ // conversion
+ // to timestamp is interpreted as milliseconds by default. The
integer to timestamp
+ // casting
+ // is also config driven. The filter operator changes its
promotion based on config:
+ // "int.timestamp.conversion.in.seconds". Disable PPD for
integer cases.
+ break;
+ default:
+ break;
+ }
+
+ throw new SargCastException(
+ String.format(
+ "ORC SARGS could not convert from %s to %s",
+ obj.getClass().getSimpleName(), type));
+ }
+
+ /** search argument applier. */
+ public static class SargApplier {
+ public static final boolean[] READ_ALL_RGS = null;
+ public static final boolean[] READ_NO_RGS = new boolean[0];
+
+ private final OrcFile.WriterVersion writerVersion;
+ private final SearchArgument sarg;
+ private final List<PredicateLeaf> sargLeaves;
+ private final int[] filterColumns;
+ private final long rowIndexStride;
+ // same as the above array, but indices are set to true
+ private final SchemaEvolution evolution;
+ private final long[] exceptionCount;
+ private final boolean useUTCTimestamp;
+ private final boolean writerUsedProlepticGregorian;
+ private final boolean convertToProlepticGregorian;
+
+ /**
+ * @deprecated Use the constructor having full parameters. This exists
for backward
+ * compatibility.
+ */
+ public SargApplier(
+ SearchArgument sarg,
+ long rowIndexStride,
+ SchemaEvolution evolution,
+ OrcFile.WriterVersion writerVersion,
+ boolean useUTCTimestamp) {
+ this(sarg, rowIndexStride, evolution, writerVersion,
useUTCTimestamp, false, false);
+ }
+
+ public SargApplier(
+ SearchArgument sarg,
+ long rowIndexStride,
+ SchemaEvolution evolution,
+ OrcFile.WriterVersion writerVersion,
+ boolean useUTCTimestamp,
+ boolean writerUsedProlepticGregorian,
+ boolean convertToProlepticGregorian) {
+ this.writerVersion = writerVersion;
+ this.sarg = sarg;
+ sargLeaves = sarg.getLeaves();
+ this.writerUsedProlepticGregorian = writerUsedProlepticGregorian;
+ this.convertToProlepticGregorian = convertToProlepticGregorian;
+ filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves,
evolution);
+ this.rowIndexStride = rowIndexStride;
+ this.evolution = evolution;
+ exceptionCount = new long[sargLeaves.size()];
+ this.useUTCTimestamp = useUTCTimestamp;
+ }
+
+ public void setRowIndexCols(boolean[] rowIndexCols) {
+ // included will not be null, row options will fill the array with
+ // trues if null
+ for (int i : filterColumns) {
+ // filter columns may have -1 as index which could be partition
+ // column in SARG.
+ if (i > 0) {
+ rowIndexCols[i] = true;
+ }
+ }
+ }
+
+ /**
+ * Pick the row groups that we need to load from the current stripe.
+ *
+ * @return an array with a boolean for each row group or null if all
of the row groups must
+ * be read.
+ * @throws IOException
+ */
+ public boolean[] pickRowGroups(
+ StripeInformation stripe,
+ OrcProto.RowIndex[] indexes,
+ OrcProto.Stream.Kind[] bloomFilterKinds,
+ List<OrcProto.ColumnEncoding> encodings,
+ OrcProto.BloomFilterIndex[] bloomFilterIndices,
+ boolean returnNone,
+ long rowBaseInStripe,
+ FileIndexResult fileIndexResult)
+ throws IOException {
+ long rowsInStripe = stripe.getNumberOfRows();
+ int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
rowIndexStride);
+ boolean[] result = new boolean[groupsInStripe]; // TODO: avoid
alloc?
+ SearchArgument.TruthValue[] leafValues =
+ new SearchArgument.TruthValue[sargLeaves.size()];
+ boolean hasSelected = false;
+ boolean hasSkipped = false;
+ SearchArgument.TruthValue[] exceptionAnswer =
+ new SearchArgument.TruthValue[leafValues.length];
+ RoaringBitmap32 bitmap = null;
+ if (fileIndexResult instanceof BitmapIndexResultLazy) {
+ bitmap = ((BitmapIndexResultLazy) fileIndexResult).get();
+ }
+ for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
+ for (int pred = 0; pred < leafValues.length; ++pred) {
+ int columnIx = filterColumns[pred];
+ if (columnIx == -1) {
+ // the column is a virtual column
+ leafValues[pred] =
SearchArgument.TruthValue.YES_NO_NULL;
+ } else if (exceptionAnswer[pred] != null) {
+ leafValues[pred] = exceptionAnswer[pred];
+ } else {
+ if (indexes[columnIx] == null) {
+ LOG.warn("Index is not populated for " + columnIx);
+ return READ_ALL_RGS;
+ }
+ OrcProto.RowIndexEntry entry =
indexes[columnIx].getEntry(rowGroup);
+ if (entry == null) {
+ throw new AssertionError(
+ "RG is not populated for " + columnIx + "
rg " + rowGroup);
+ }
+ OrcProto.ColumnStatistics stats =
EMPTY_COLUMN_STATISTICS;
+ if (entry.hasStatistics()) {
+ stats = entry.getStatistics();
+ }
+ OrcProto.BloomFilter bf = null;
+ OrcProto.Stream.Kind bfk = null;
+ if (bloomFilterIndices != null &&
bloomFilterIndices[columnIx] != null) {
+ bfk = bloomFilterKinds[columnIx];
+ bf =
bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
+ }
+ if (evolution != null &&
evolution.isPPDSafeConversion(columnIx)) {
+ PredicateLeaf predicate = sargLeaves.get(pred);
+ try {
+ leafValues[pred] =
+ evaluatePredicateProto(
+ stats,
+ predicate,
+ bfk,
+ encodings.get(columnIx),
+ bf,
+ writerVersion,
+
evolution.getFileSchema().findSubtype(columnIx),
+ writerUsedProlepticGregorian,
+ useUTCTimestamp);
+ } catch (Exception e) {
+ exceptionCount[pred] += 1;
+ if (e instanceof SargCastException) {
+ LOG.info(
+ "Skipping ORC PPD - "
+ + e.getMessage()
+ + " on "
+ + predicate);
+ } else {
+ final String reason =
+ e.getClass().getSimpleName()
+ + " when evaluating
predicate."
+ + " Skipping ORC PPD."
+ + " Stats: "
+ + stats
+ + " Predicate: "
+ + predicate;
+ LOG.warn(reason, e);
+ }
+ boolean hasNoNull = stats.hasHasNull() &&
!stats.getHasNull();
+ if (predicate
+ .getOperator()
+
.equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ || hasNoNull) {
+ exceptionAnswer[pred] =
SearchArgument.TruthValue.YES_NO;
+ } else {
+ exceptionAnswer[pred] =
SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ leafValues[pred] = exceptionAnswer[pred];
+ }
+ } else {
+ leafValues[pred] =
SearchArgument.TruthValue.YES_NO_NULL;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stats = " + stats);
+ LOG.trace(
+ "Setting " + sargLeaves.get(pred) + " to "
+ leafValues[pred]);
+ }
+ }
+ }
+ result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+ if (bitmap != null) {
+ long firstRow = rowBaseInStripe + rowIndexStride *
rowGroup;
+ long lastRow = Math.min(firstRow + rowIndexStride,
firstRow + rowsInStripe);
+ result[rowGroup] &= bitmap.rangeCardinality(firstRow,
lastRow) > 0;
+ }
+ hasSelected = hasSelected || result[rowGroup];
+ hasSkipped = hasSkipped || (!result[rowGroup]);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Row group "
+ + (rowIndexStride * rowGroup)
+ + " to "
+ + (rowIndexStride * (rowGroup + 1) - 1)
+ + " is "
+ + (result[rowGroup] ? "" : "not ")
+ + "included.");
+ }
+ }
+
+ return hasSkipped
+ ? ((hasSelected || !returnNone) ? result : READ_NO_RGS)
+ : READ_ALL_RGS;
+ }
+
+ /**
+ * Get the count of exceptions for testing.
+ *
+ * @return
+ */
+ long[] getExceptionCount() {
+ return exceptionCount;
+ }
+ }
+
+ /**
+ * Pick the row groups that we need to load from the current stripe.
+ *
+ * @return an array with a boolean for each row group or null if all of
the row groups must be
+ * read.
+ * @throws IOException
+ */
+ protected boolean[] pickRowGroups() throws IOException {
+ // Read the Row Indicies if required
+ if (rowIndexColsToRead != null) {
+ readCurrentStripeRowIndex();
+ }
+
+ // In the absence of SArg all rows groups should be included
+ if (sargApp == null) {
+ return null;
+ }
+ return sargApp.pickRowGroups(
+ stripes.get(currentStripe),
+ indexes.getRowGroupIndex(),
+ skipBloomFilters ? null : indexes.getBloomFilterKinds(),
+ stripeFooter.getColumnsList(),
+ skipBloomFilters ? null : indexes.getBloomFilterIndex(),
+ false,
+ rowBaseInStripe,
+ fileIndexResult);
+ }
+
+ private void clearStreams() {
+ planner.clearStreams();
+ }
+
+ /**
+ * Read the current stripe into memory.
+ *
+ * @throws IOException
+ */
+ private void readStripe() throws IOException {
+ StripeInformation stripe = beginReadStripe();
+ planner.parseStripe(stripe, fileIncluded);
+ includedRowGroups = pickRowGroups();
+
+ // move forward to the first unskipped row
+ if (includedRowGroups != null) {
+ while (rowInStripe < rowCountInStripe
+ && !includedRowGroups[(int) (rowInStripe /
rowIndexStride)]) {
+ rowInStripe = Math.min(rowCountInStripe, rowInStripe +
rowIndexStride);
+ }
+ }
+
+ // if we haven't skipped the whole stripe, read the data
+ if (rowInStripe < rowCountInStripe) {
+ planner.readData(indexes, includedRowGroups, false,
startReadPhase);
+ reader.startStripe(planner, startReadPhase);
+ needsFollowColumnsRead = true;
+ // if we skipped the first row group, move the pointers forward
+ if (rowInStripe != 0) {
+ seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride),
startReadPhase);
+ }
+ }
+ }
+
+ private StripeInformation beginReadStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ clearStreams();
+ // setup the position in the stripe
+ rowCountInStripe = stripe.getNumberOfRows();
+ rowInStripe = 0;
+ followRowInStripe = 0;
+ rowBaseInStripe = 0;
+ for (int i = 0; i < currentStripe; ++i) {
+ rowBaseInStripe += stripes.get(i).getNumberOfRows();
+ }
+ // reset all of the indexes
+ OrcProto.RowIndex[] rowIndex = indexes.getRowGroupIndex();
+ for (int i = 0; i < rowIndex.length; ++i) {
+ rowIndex[i] = null;
+ }
+ return stripe;
+ }
+
+ /**
+ * Read the next stripe until we find a row that we don't skip.
+ *
+ * @throws IOException
+ */
+ private void advanceStripe() throws IOException {
+ rowInStripe = rowCountInStripe;
+ while (rowInStripe >= rowCountInStripe && currentStripe <
stripes.size() - 1) {
+ currentStripe += 1;
+ readStripe();
+ }
+ }
+
+ /**
+ * Determine the RowGroup based on the supplied row id.
+ *
+ * @param rowIdx Row for which the row group is being determined
+ * @return Id of the RowGroup that the row belongs to
+ */
+ private int computeRGIdx(long rowIdx) {
+ return rowIndexStride == 0 ? 0 : (int) (rowIdx / rowIndexStride);
+ }
+
+ /**
+ * Skip over rows that we aren't selecting, so that the next row is one
that we will read.
+ *
+ * @param nextRow the row we want to go to
+ * @throws IOException
+ */
+ private boolean advanceToNextRow(BatchReader reader, long nextRow, boolean
canAdvanceStripe)
+ throws IOException {
+ long nextRowInStripe = nextRow - rowBaseInStripe;
+ // check for row skipping
+ if (rowIndexStride != 0
+ && includedRowGroups != null
+ && nextRowInStripe < rowCountInStripe) {
+ int rowGroup = computeRGIdx(nextRowInStripe);
+ if (!includedRowGroups[rowGroup]) {
+ while (rowGroup < includedRowGroups.length &&
!includedRowGroups[rowGroup]) {
+ rowGroup += 1;
+ }
+ if (rowGroup >= includedRowGroups.length) {
+ if (canAdvanceStripe) {
+ advanceStripe();
+ }
+ return canAdvanceStripe;
+ }
+ nextRowInStripe = Math.min(rowCountInStripe, rowGroup *
rowIndexStride);
+ }
+ }
+ if (nextRowInStripe >= rowCountInStripe) {
+ if (canAdvanceStripe) {
+ advanceStripe();
+ }
+ return canAdvanceStripe;
+ }
+ if (nextRowInStripe != rowInStripe) {
+ if (rowIndexStride != 0) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ seekToRowEntry(reader, rowGroup, startReadPhase);
+ reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride,
startReadPhase);
+ } else {
+ reader.skipRows(nextRowInStripe - rowInStripe, startReadPhase);
+ }
+ rowInStripe = nextRowInStripe;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+ try {
+ int batchSize;
+ // do...while is required to handle the case where the filter
eliminates all rows in the
+ // batch, we never return an empty batch unless the file is
exhausted
+ do {
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ if (currentStripe >= stripes.size()) {
+ batch.size = 0;
+ return false;
+ }
+ // Read stripe in Memory
+ readStripe();
+ followRowInStripe = rowInStripe;
+ }
+
+ batchSize = computeBatchSize(batch.getMaxSize());
+ reader.setVectorColumnCount(batch.getDataColumnCount());
+ reader.nextBatch(batch, batchSize, startReadPhase);
+ if (startReadPhase == TypeReader.ReadPhase.LEADERS &&
batch.size > 0) {
+ // At least 1 row has been selected and as a result we
read the follow columns
+ // into the
+ // row batch
+ reader.nextBatch(
+ batch, batchSize,
prepareFollowReaders(rowInStripe, followRowInStripe));
+ followRowInStripe = rowInStripe + batchSize;
+ }
+ rowInStripe += batchSize;
+ advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+ // batch.size can be modified by filter so only batchSize can
tell if we actually
+ // read rows
+ } while (batchSize != 0 && batch.size == 0);
+
+ if (noSelectedVector) {
+ // In case selected vector is not supported we leave the size
to be read size. In
+ // this case
+ // the non filter columns might be read selectively, however
the filter after the
+ // reader
+ // should eliminate rows that don't match predicate conditions
+ batch.size = batchSize;
+ batch.selectedInUse = false;
+ }
+
+ return batchSize != 0;
+ } catch (IOException e) {
+ // Rethrow exception with file name in log message
+ throw new IOException("Error reading file: " + path, e);
+ }
+ }
+
+ /**
+ * This method prepares the non-filter column readers for next batch. This
involves the
+ * following 1. Determine position 2. Perform IO if required 3. Position
the non-filter readers
+ *
+ * <p>This method is repositioning the non-filter columns and as such this
method shall never
+ * have to deal with navigating the stripe forward or skipping row groups,
all of this should
+ * have already taken place based on the filter columns.
+ *
+ * @param toFollowRow The rowIdx identifies the required row position
within the stripe for
+ * follow read
+ * @param fromFollowRow Indicates the current position of the follow read,
exclusive
+ * @return the read phase for reading non-filter columns, this shall be
FOLLOWERS_AND_PARENTS in
+ * case of a seek otherwise will be FOLLOWERS
+ */
+ private TypeReader.ReadPhase prepareFollowReaders(long toFollowRow, long
fromFollowRow)
+ throws IOException {
+ // 1. Determine the required row group and skip rows needed from the
RG start
+ int needRG = computeRGIdx(toFollowRow);
+ // The current row is not yet read so we -1 to compute the previously
read row group
+ int readRG = computeRGIdx(fromFollowRow - 1);
+ long skipRows;
+ if (needRG == readRG && toFollowRow >= fromFollowRow) {
+ // In case we are skipping forward within the same row group, we
compute skip rows from
+ // the
+ // current position
+ skipRows = toFollowRow - fromFollowRow;
+ } else {
+ // In all other cases including seeking backwards, we compute the
skip rows from the
+ // start of
+ // the required row group
+ skipRows = toFollowRow - (needRG * rowIndexStride);
+ }
+
+ // 2. Plan the row group idx for the non-filter columns if this has
not already taken place
+ if (needsFollowColumnsRead) {
+ needsFollowColumnsRead = false;
+ planner.readFollowData(indexes, includedRowGroups, needRG, false);
+ reader.startStripe(planner, TypeReader.ReadPhase.FOLLOWERS);
+ }
+
+ // 3. Position the non-filter readers to the required RG and skipRows
+ TypeReader.ReadPhase result = TypeReader.ReadPhase.FOLLOWERS;
+ if (needRG != readRG || toFollowRow < fromFollowRow) {
+ // When having to change a row group or in case of back
navigation, seek both the filter
+ // parents and non-filter. This will re-position the parents
present vector. This is
+ // needed
+ // to determine the number of non-null values to skip on the
non-filter columns.
+ seekToRowEntry(reader, needRG,
TypeReader.ReadPhase.FOLLOWERS_AND_PARENTS);
+ // skip rows on both the filter parents and non-filter as both
have been positioned in
+ // the
+ // previous step
+ reader.skipRows(skipRows,
TypeReader.ReadPhase.FOLLOWERS_AND_PARENTS);
+ result = TypeReader.ReadPhase.FOLLOWERS_AND_PARENTS;
+ } else if (skipRows > 0) {
+ // in case we are only skipping within the row group, position the
filter parents back
+ // to the
+ // position of the follow. This is required to determine the
non-null values to skip on
+ // the
+ // non-filter columns.
+ seekToRowEntry(reader, readRG,
TypeReader.ReadPhase.LEADER_PARENTS);
+ reader.skipRows(
+ fromFollowRow - (readRG * rowIndexStride),
TypeReader.ReadPhase.LEADER_PARENTS);
+ // Move both the filter parents and non-filter forward, this will
compute the correct
+ // non-null skips on follow children
+ reader.skipRows(skipRows,
TypeReader.ReadPhase.FOLLOWERS_AND_PARENTS);
+ result = TypeReader.ReadPhase.FOLLOWERS_AND_PARENTS;
+ }
+ // Identifies the read level that should be performed for the read
+ // FOLLOWERS_WITH_PARENTS indicates repositioning identifying both
non-filter and filter
+ // parents
+ // FOLLOWERS indicates read only of the non-filter level without the
parents, which is used
+ // during
+ // contiguous read. During a contiguous read no skips are needed and
the non-null
+ // information of
+ // the parent is available in the column vector for use during
non-filter read
+ return result;
+ }
+
+ private int computeBatchSize(long targetBatchSize) {
+ final int batchSize;
+ // In case of PPD, batch size should be aware of row group boundaries.
If only a subset of
+ // row
+ // groups are selected then marker position is set to the end of range
(subset of row groups
+ // within strip). Batch size computed out of marker position makes
sure that batch size is
+ // aware of row group boundary and will not cause overflow when
reading rows
+ // illustration of this case is here
https://issues.apache.org/jira/browse/HIVE-6287
+ if (rowIndexStride != 0
+ && (includedRowGroups != null || startReadPhase !=
TypeReader.ReadPhase.ALL)
+ && rowInStripe < rowCountInStripe) {
+ int startRowGroup = (int) (rowInStripe / rowIndexStride);
+ if (includedRowGroups != null &&
!includedRowGroups[startRowGroup]) {
+ while (startRowGroup < includedRowGroups.length
+ && !includedRowGroups[startRowGroup]) {
+ startRowGroup += 1;
+ }
+ }
+
+ int endRowGroup = startRowGroup;
+ // We force row group boundaries when dealing with filters. We
adjust the end row group
+ // to
+ // be the next row group even if more than one are possible
selections.
+ if (includedRowGroups != null && startReadPhase ==
TypeReader.ReadPhase.ALL) {
+ while (endRowGroup < includedRowGroups.length &&
includedRowGroups[endRowGroup]) {
+ endRowGroup += 1;
+ }
+ } else {
+ endRowGroup += 1;
+ }
+
+ final long markerPosition = Math.min((endRowGroup *
rowIndexStride), rowCountInStripe);
+ batchSize = (int) Math.min(targetBatchSize, (markerPosition -
rowInStripe));
+
+ if (isLogDebugEnabled && batchSize < targetBatchSize) {
+ LOG.debug("markerPosition: " + markerPosition + " batchSize: "
+ batchSize);
+ }
+ } else {
+ batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe -
rowInStripe));
+ }
+ return batchSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ clearStreams();
+ dataReader.close();
+ }
+
+ @Override
+ public long getRowNumber() {
+ return rowInStripe + rowBaseInStripe + firstRow;
+ }
+
+ /**
+ * Return the fraction of rows that have been read from the selected.
section of the file
+ *
+ * @return fraction between 0.0 and 1.0 of rows consumed
+ */
+ @Override
+ public float getProgress() {
+ return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+ }
+
+ private int findStripe(long rowNumber) {
+ for (int i = 0; i < stripes.size(); i++) {
+ StripeInformation stripe = stripes.get(i);
+ if (stripe.getNumberOfRows() > rowNumber) {
+ return i;
+ }
+ rowNumber -= stripe.getNumberOfRows();
+ }
+ throw new IllegalArgumentException("Seek after the end of reader
range");
+ }
+
+ private void readCurrentStripeRowIndex() throws IOException {
+ planner.readRowIndex(rowIndexColsToRead, indexes);
+ }
+
+ public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
boolean[] readCols)
+ throws IOException {
+ // Use the cached objects if the read request matches the cached
request
+ if (stripeIndex == currentStripe
+ && (readCols == null || Arrays.equals(readCols,
rowIndexColsToRead))) {
+ if (rowIndexColsToRead != null) {
+ return indexes;
+ } else {
+ return planner.readRowIndex(readCols, indexes);
+ }
+ } else {
+ StripePlanner copy = new StripePlanner(planner);
+ if (included == null) {
+ included = new boolean[schema.getMaximumId() + 1];
+ Arrays.fill(included, true);
+ }
+ copy.parseStripe(stripes.get(stripeIndex), included);
+ return copy.readRowIndex(readCols, null);
+ }
+ }
+
+ private void seekToRowEntry(BatchReader reader, int rowEntry,
TypeReader.ReadPhase readPhase)
+ throws IOException {
+ OrcProto.RowIndex[] rowIndices = indexes.getRowGroupIndex();
+ PositionProvider[] index = new PositionProvider[rowIndices.length];
+ for (int i = 0; i < index.length; ++i) {
+ if (rowIndices[i] != null) {
+ OrcProto.RowIndexEntry entry =
rowIndices[i].getEntry(rowEntry);
+ // This is effectively a test for pre-ORC-569 files.
+ if (rowEntry == 0 && entry.getPositionsCount() == 0) {
+ index[i] = new ZeroPositionProvider();
+ } else {
+ index[i] = new PositionProviderImpl(entry);
+ }
+ }
+ }
+ reader.seek(index, readPhase);
+ }
+
+ @Override
+ public void seekToRow(long rowNumber) throws IOException {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number
" + rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
rowNumber);
+ }
+ // convert to our internal form (rows from the beginning of slice)
+ rowNumber -= firstRow;
+
+ // move to the right stripe
+ int rightStripe = findStripe(rowNumber);
+ if (rightStripe != currentStripe) {
+ currentStripe = rightStripe;
+ readStripe();
+ }
+ if (rowIndexColsToRead == null) {
+ // Read the row indexes only if they were not already read as part
of readStripe()
+ readCurrentStripeRowIndex();
+ }
+
+ // if we aren't to the right row yet, advance in the stripe.
+ advanceToNextRow(reader, rowNumber, true);
+ }
+
+ private static final String TRANSLATED_SARG_SEPARATOR = "_";
+
+ public static String encodeTranslatedSargColumn(int rootColumn, Integer
indexInSourceTable) {
+ return rootColumn
+ + TRANSLATED_SARG_SEPARATOR
+ + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
+ }
+
+ public static int[] mapTranslatedSargColumns(
+ List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
+ int[] result = new int[sargLeaves.size()];
+ OrcProto.Type lastRoot = null; // Root will be the same for everyone
as of now.
+ String lastRootStr = null;
+ for (int i = 0; i < result.length; ++i) {
+ String[] rootAndIndex =
+
sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
+ assert rootAndIndex.length == 2;
+ String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
+ int index = Integer.parseInt(indexStr);
+ // First, check if the column even maps to anything.
+ if (index == -1) {
+ result[i] = -1;
+ continue;
+ }
+ assert index >= 0;
+ // Then, find the root type if needed.
+ if (!rootStr.equals(lastRootStr)) {
+ lastRoot = types.get(Integer.parseInt(rootStr));
+ lastRootStr = rootStr;
+ }
+ // Subtypes of the root types correspond, in order, to the columns
in the table schema
+ // (disregarding schema evolution that doesn't presently work).
Get the index for the
+ // corresponding subtype.
+ result[i] = lastRoot.getSubtypes(index);
+ }
+ return result;
+ }
+
+ public CompressionCodec getCompressionCodec() {
+ return dataReader.getCompressionOptions().getCodec();
+ }
+
+ public int getMaxDiskRangeChunkLimit() {
+ return maxDiskRangeChunkLimit;
+ }
+
+ /**
+ * Get sargApplier for testing.
+ *
+ * @return sargApplier in record reader.
+ */
+ SargApplier getSargApp() {
+ return sargApp;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 5093a5010..4f4174240 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
@@ -45,6 +46,8 @@ import org.apache.orc.OrcFile;
import org.apache.orc.RecordReader;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ReaderImpl;
+import org.apache.orc.impl.RecordReaderImpl;
import javax.annotation.Nullable;
@@ -104,7 +107,8 @@ public class OrcReaderFactory implements
FormatReaderFactory {
context.fileIO(),
context.filePath(),
0,
- context.fileSize());
+ context.fileSize(),
+ context.fileIndex());
return new OrcVectorizedReader(orcReader, poolOfBatches);
}
@@ -251,9 +255,10 @@ public class OrcReaderFactory implements
FormatReaderFactory {
FileIO fileIO,
org.apache.paimon.fs.Path path,
long splitStart,
- long splitLength)
+ long splitLength,
+ FileIndexResult fileIndexResult)
throws IOException {
- org.apache.orc.Reader orcReader = createReader(conf, fileIO, path);
+ org.apache.orc.Reader orcReader = createReader(conf, fileIO, path,
fileIndexResult);
try {
// get offset and length for the stripes that start in the split
Pair<Long, Long> offsetAndLength =
@@ -328,7 +333,8 @@ public class OrcReaderFactory implements
FormatReaderFactory {
public static org.apache.orc.Reader createReader(
org.apache.hadoop.conf.Configuration conf,
FileIO fileIO,
- org.apache.paimon.fs.Path path)
+ org.apache.paimon.fs.Path path,
+ FileIndexResult fileIndexResult)
throws IOException {
// open ORC file and create reader
org.apache.hadoop.fs.Path hPath = new
org.apache.hadoop.fs.Path(path.toUri());
@@ -338,6 +344,11 @@ public class OrcReaderFactory implements
FormatReaderFactory {
// configure filesystem from Paimon FileIO
readerOptions.filesystem(new HadoopReadOnlyFileSystem(fileIO));
- return OrcFile.createReader(hPath, readerOptions);
+ return new ReaderImpl(hPath, readerOptions) {
+ @Override
+ public RecordReader rows(Options options) throws IOException {
+ return new RecordReaderImpl(this, options, fileIndexResult);
+ }
+ };
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
index dc6fd69dd..e3c741cb8 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
@@ -74,7 +74,8 @@ public class OrcSimpleStatsExtractor implements
SimpleStatsExtractor {
@Override
public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(FileIO fileIO,
Path path)
throws IOException {
- try (Reader reader = OrcReaderFactory.createReader(new
Configuration(), fileIO, path)) {
+ try (Reader reader =
+ OrcReaderFactory.createReader(new Configuration(), fileIO,
path, null)) {
long rowCount = reader.getNumberOfRows();
ColumnStatistics[] columnStatistics = reader.getStatistics();
TypeDescription schema = reader.getSchema();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2feebb321..2a62c0bc8 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -113,7 +113,8 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
ParquetFileReader reader =
new ParquetFileReader(
ParquetInputFile.fromPath(context.fileIO(),
context.filePath()),
- builder.build());
+ builder.build(),
+ context.fileIndex());
MessageType fileSchema = reader.getFileMetaData().getSchema();
MessageType requestedSchema = clipParquetSchema(fileSchema);
reader.setRequestedSchema(requestedSchema);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
index 055fe83f7..82d19e448 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -48,7 +49,7 @@ public class ParquetUtil {
*/
public static Pair<Map<String, Statistics<?>>,
SimpleStatsExtractor.FileInfo>
extractColumnStats(FileIO fileIO, Path path) throws IOException {
- try (ParquetFileReader reader = getParquetReader(fileIO, path)) {
+ try (ParquetFileReader reader = getParquetReader(fileIO, path, null)) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList =
parquetMetadata.getBlocks();
Map<String, Statistics<?>> resultStats = new HashMap<>();
@@ -77,9 +78,12 @@ public class ParquetUtil {
* @param path the path of parquet files to be read
* @return parquet reader, used for reading footer, status, etc.
*/
- public static ParquetFileReader getParquetReader(FileIO fileIO, Path path)
throws IOException {
+ public static ParquetFileReader getParquetReader(
+ FileIO fileIO, Path path, FileIndexResult fileIndexResult) throws
IOException {
return new ParquetFileReader(
- ParquetInputFile.fromPath(fileIO, path),
ParquetReadOptions.builder().build());
+ ParquetInputFile.fromPath(fileIO, path),
+ ParquetReadOptions.builder().build(),
+ fileIndexResult);
}
static void assertStatsClass(
diff --git
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index aca1f021b..91860436b 100644
---
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -18,10 +18,13 @@
package org.apache.parquet.hadoop;
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
import org.apache.paimon.format.parquet.ParquetInputFile;
import org.apache.paimon.format.parquet.ParquetInputStream;
import org.apache.paimon.fs.FileRange;
import org.apache.paimon.fs.VectoredReadable;
+import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
@@ -92,6 +95,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import java.util.zip.CRC32;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -225,10 +229,14 @@ public class ParquetFileReader implements Closeable {
private DictionaryPageReader nextDictionaryReader = null;
private InternalFileDecryptor fileDecryptor = null;
+ private FileIndexResult fileIndexResult;
- public ParquetFileReader(InputFile file, ParquetReadOptions options)
throws IOException {
+ public ParquetFileReader(
+ InputFile file, ParquetReadOptions options, FileIndexResult
fileIndexResult)
+ throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = (ParquetInputFile) file;
+ this.fileIndexResult = fileIndexResult;
this.f = this.file.newStream();
this.options = options;
try {
@@ -333,22 +341,38 @@ public class ParquetFileReader implements Closeable {
private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks)
throws IOException {
FilterCompat.Filter recordFilter = options.getRecordFilter();
- if (checkRowIndexOffsetExists(blocks) &&
FilterCompat.isFilteringRequired(recordFilter)) {
- // set up data filters based on configured levels
- List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
+ if (checkRowIndexOffsetExists(blocks)) {
+ if (FilterCompat.isFilteringRequired(recordFilter)) {
+ // set up data filters based on configured levels
+ List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
- if (options.useStatsFilter()) {
- levels.add(STATISTICS);
- }
+ if (options.useStatsFilter()) {
+ levels.add(STATISTICS);
+ }
- if (options.useDictionaryFilter()) {
- levels.add(DICTIONARY);
- }
+ if (options.useDictionaryFilter()) {
+ levels.add(DICTIONARY);
+ }
- if (options.useBloomFilter()) {
- levels.add(BLOOMFILTER);
+ if (options.useBloomFilter()) {
+ levels.add(BLOOMFILTER);
+ }
+ blocks = RowGroupFilter.filterRowGroups(levels, recordFilter,
blocks, this);
+ }
+ if (fileIndexResult instanceof BitmapIndexResultLazy) {
+ RoaringBitmap32 bitmap = ((BitmapIndexResultLazy)
fileIndexResult).get();
+ blocks =
+ blocks.stream()
+ .filter(
+ it -> {
+ long rowIndexOffset =
it.getRowIndexOffset();
+ return bitmap.rangeCardinality(
+ rowIndexOffset,
+ rowIndexOffset +
it.getRowCount())
+ > 0;
+ })
+ .collect(Collectors.toList());
}
- return RowGroupFilter.filterRowGroups(levels, recordFilter,
blocks, this);
}
return blocks;
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 221d524ff..e0d1d240a 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -75,7 +75,7 @@ public class ParquetFormatReadWriteTest extends
FormatReadWriteTest {
writer.close();
out.close();
- try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO,
file)) {
+ try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO,
file, null)) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList =
parquetMetadata.getBlocks();
for (BlockMetaData blockMetaData : blockMetaDataList) {
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 806f1f628..bb8e84bd5 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -103,9 +103,12 @@ public class SparkFileIndexITCase extends SparkWriteITCase
{
+ "'file-index.in-manifest-threshold'='1B');");
spark.sql("INSERT INTO T VALUES (0),(1),(2),(3),(4),(5);");
+ List<Row> rows1 = spark.sql("SELECT a FROM T where
a>3;").collectAsList();
+ assertThat(rows1.toString()).isEqualTo("[[4], [5]]");
+
// check query result
- List<Row> rows = spark.sql("SELECT a FROM T where
a='3';").collectAsList();
- assertThat(rows.toString()).isEqualTo("[[3]]");
+ List<Row> rows2 = spark.sql("SELECT a FROM T where
a=3;").collectAsList();
+ assertThat(rows2.toString()).isEqualTo("[[3]]");
// check index reader
foreachIndexReader(