PARQUET-654: Add option to disable record-level filtering. This can be used by frameworks that use codegen for filtering to avoid running filters within Parquet.
Author: Ryan Blue <b...@apache.org> Closes #353 from rdblue/PARQUET-654-add-record-level-filter-option and squashes the following commits: b497e7f [Ryan Blue] PARQUET-654: Add option to disable record-level filtering. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b60c79e Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b60c79e Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b60c79e Branch: refs/heads/parquet-1.8.x Commit: 6b60c79e59e3e9a67dc0c2339675d5eb6cd1b510 Parents: d59381a Author: Ryan Blue <b...@apache.org> Authored: Wed Jul 13 14:50:08 2016 -0700 Committer: Ryan Blue <b...@apache.org> Committed: Mon Jan 9 16:54:54 2017 -0800 ---------------------------------------------------------------------- .../hadoop/InternalParquetRecordReader.java | 8 +++++++- .../apache/parquet/hadoop/ParquetFileReader.java | 10 ++++++++-- .../apache/parquet/hadoop/ParquetInputFormat.java | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index f74e57c..d43fd7d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -46,6 +46,8 @@ import org.apache.parquet.schema.MessageType; import static java.lang.String.format; import static org.apache.parquet.Log.DEBUG; import static org.apache.parquet.Preconditions.checkNotNull; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT; import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING; class InternalParquetRecordReader<T> { @@ -53,6 +55,7 @@ class InternalParquetRecordReader<T> { private ColumnIOFactory columnIOFactory = null; private final Filter filter; + private boolean filterRecords = true; private MessageType requestedSchema; private MessageType fileSchema; @@ -130,7 +133,8 @@ class InternalParquetRecordReader<T> { if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking); - recordReader = columnIO.getRecordReader(pages, recordConverter, filter); + recordReader = columnIO.getRecordReader(pages, recordConverter, + filterRecords ? filter : FilterCompat.NOOP); startedAssemblingCurrentBlockAt = System.currentTimeMillis(); totalCountLoadedSoFar += pages.getRowCount(); ++ currentBlock; @@ -173,6 +177,8 @@ class InternalParquetRecordReader<T> { this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); this.total = reader.getRecordCount(); this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total); + this.filterRecords = configuration.getBoolean( + RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT); LOG.info("RecordReader initialized will read a total of " + total + " records."); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 3c4c6fc..2d1c62b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -28,6 +28,10 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromP import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT; import java.io.ByteArrayInputStream; import java.io.Closeable; @@ -598,11 +602,13 @@ public class ParquetFileReader implements Closeable { // set up data filters based on configured levels List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>(); - if (conf.getBoolean("parquet.filter.statistics.enabled", true)) { + if (conf.getBoolean( + STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) { levels.add(STATISTICS); } - if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) { + if (conf.getBoolean( + DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) { levels.add(DICTIONARY); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index e3536d7..1fe57f9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -116,6 +116,24 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> { public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate"; /** + * key to configure whether record-level filtering is enabled + */ + public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled"; + static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; + + /** + * key to configure whether row group stats filtering is enabled + */ + public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; + static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; + + /** + * key to configure whether row group dictionary filtering is enabled + */ + public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled"; + static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false; + + /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately. * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.