PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter Author: Patrick Woody <pwo...@palantir.com> Author: Patrick Woody <patrick.woo...@gmail.com>
Closes #394 from pwoody/pw/dictionaryUdp and squashes the following commits: d8499a0 [Patrick Woody] short circuiting and style changes 4cb9f0c [Patrick Woody] more missing imports 1ec0d39 [Patrick Woody] fix missing import 3ee4489 [Patrick Woody] PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/0c8d4890 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/0c8d4890 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/0c8d4890 Branch: refs/heads/parquet-1.8.x Commit: 0c8d48908bff1ef3e6171e59c0299235599c0398 Parents: 9c13e83 Author: Patrick Woody <pwo...@palantir.com> Authored: Tue Dec 20 14:35:57 2016 -0800 Committer: Ryan Blue <b...@apache.org> Committed: Mon Jan 9 16:58:15 2017 -0800 ---------------------------------------------------------------------- .../dictionarylevel/DictionaryFilter.java | 56 +++++++++--- .../dictionarylevel/DictionaryFilterTest.java | 94 ++++++++++++++++++++ 2 files changed, 140 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/0c8d4890/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index 91f3007..19604ec 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -212,8 +212,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> { return BLOCK_MIGHT_MATCH; } - for(T entry : dictSet) { - if(value.compareTo(entry) > 0) { + for (T entry : dictSet) { + if (value.compareTo(entry) > 0) { return BLOCK_MIGHT_MATCH; } } @@ -253,8 +253,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> { return BLOCK_MIGHT_MATCH; } - for(T entry : dictSet) { - if(value.compareTo(entry) >= 0) { + for (T entry : dictSet) { + if (value.compareTo(entry) >= 0) { return BLOCK_MIGHT_MATCH; } } @@ -292,8 +292,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> { return BLOCK_MIGHT_MATCH; } - for(T entry : dictSet) { - if(value.compareTo(entry) < 0) { + for (T entry : dictSet) { + if (value.compareTo(entry) < 0) { return BLOCK_MIGHT_MATCH; } } @@ -333,8 +333,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> { return BLOCK_MIGHT_MATCH; } - for(T entry : dictSet) { - if(value.compareTo(entry) <= 0) { + for (T entry : dictSet) { + if (value.compareTo(entry) <= 0) { return BLOCK_MIGHT_MATCH; } } @@ -363,14 +363,50 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> { "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not); } + private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) { + Column<T> filterColumn = ud.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + U udp = ud.getUserDefinedPredicate(); + + // The column is missing, thus all null. Check if the predicate keeps null. + if (meta == null) { + if (inverted) { + return udp.keep(null); + } else { + return !udp.keep(null); + } + } + + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + try { + Set<T> dictSet = expandDictionary(meta); + if (dictSet == null) { + return BLOCK_MIGHT_MATCH; + } + + for (T entry : dictSet) { + boolean keep = udp.keep(entry); + if ((keep && !inverted) || (!keep && inverted)) return BLOCK_MIGHT_MATCH; + } + return BLOCK_CANNOT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + @Override public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> udp) { - throw new UnsupportedOperationException("UDP not supported with dictionary evaluation."); + return visit(udp, false); } @Override public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> udp) { - throw new UnsupportedOperationException("UDP not supported with dictionary evaluation."); + return visit(udp.getUserDefined(), true); } @SuppressWarnings("deprecation") http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/0c8d4890/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index eca6332..3883d87 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -19,6 +19,9 @@ package org.apache.parquet.filter2.dictionarylevel; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,11 +30,14 @@ import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.LogicalInverseRewriter; import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; import org.apache.parquet.filter2.predicate.Operators.FloatColumn; import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.filter2.predicate.Statistics; +import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; @@ -47,6 +53,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -331,6 +338,43 @@ public class DictionaryFilterTest { canDrop(or(x, y), ccmd, dictionaries)); } + + @Test + public void testUdp() throws Exception { + InInt32UDP dropabble = new InInt32UDP(ImmutableSet.of(42)); + InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205)); + + assertTrue("Should drop block for non-matching UDP", + canDrop(userDefined(intColumn("int32_field"), dropabble), ccmd, dictionaries)); + + assertFalse("Should not drop block for matching UDP", + canDrop(userDefined(intColumn("int32_field"), undroppable), ccmd, dictionaries)); + } + + @Test + public void testInverseUdp() throws Exception { + InInt32UDP droppable = new InInt32UDP(ImmutableSet.of(42)); + InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205)); + Set<Integer> allValues = ImmutableSet.copyOf(Arrays.asList(ArrayUtils.toObject(intValues))); + InInt32UDP completeMatch = new InInt32UDP(allValues); + + FilterPredicate inverse = + LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), droppable))); + FilterPredicate inverse1 = + LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), undroppable))); + FilterPredicate inverse2 = + LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), completeMatch))); + + assertFalse("Should not drop block for inverse of non-matching UDP", + canDrop(inverse, ccmd, dictionaries)); + + assertFalse("Should not drop block for inverse of UDP with some matches", + canDrop(inverse1, ccmd, dictionaries)); + + assertTrue("Should drop block for inverse of UDP with all matches", + canDrop(inverse2, ccmd, dictionaries)); + } + @Test public void testColumnWithoutDictionary() throws Exception { IntColumn plain = intColumn("plain_int32_field"); @@ -437,6 +481,56 @@ public class DictionaryFilterTest { canDrop(gtEq(b, Binary.fromString("any")), ccmd, dictionaries)); } + @Test + public void testUdpMissingColumn() throws Exception { + InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42)); + InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null)); + IntColumn fake = intColumn("missing_column"); + + assertTrue("Should drop block for null rejecting udp", + canDrop(userDefined(fake, nullRejecting), ccmd, dictionaries)); + assertFalse("Should not drop block for null accepting udp", + canDrop(userDefined(fake, nullAccepting), ccmd, dictionaries)); + } + + + @Test + public void testInverseUdpMissingColumn() throws Exception { + InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42)); + InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null)); + IntColumn fake = intColumn("missing_column"); + + assertTrue("Should drop block for null accepting udp", + canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullAccepting))), ccmd, dictionaries)); + assertFalse("Should not drop block for null rejecting udp", + canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullRejecting))), ccmd, dictionaries)); + } + + + private static final class InInt32UDP extends UserDefinedPredicate<Integer> implements Serializable { + + private final Set<Integer> ints; + + InInt32UDP(Set<Integer> ints) { + this.ints = ints; + } + + @Override + public boolean keep(Integer value) { + return ints.contains(value); + } + + @Override + public boolean canDrop(Statistics<Integer> statistics) { + return false; + } + + @Override + public boolean inverseCanDrop(Statistics<Integer> statistics) { + return false; + } + } + private static double toDouble(int value) { return (value * 1.0); }