[ 
https://issues.apache.org/jira/browse/PARQUET-1968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419350#comment-17419350
 ] 

ASF GitHub Bot commented on PARQUET-1968:
-----------------------------------------

gszadovszky commented on a change in pull request #923:
URL: https://github.com/apache/parquet-mr/pull/923#discussion_r714956184



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
##########
@@ -186,26 +186,36 @@ private boolean hasNulls(ColumnChunkMetaData column) {
       return BLOCK_MIGHT_MATCH;
     }
 
+    if (stats.isNumNullsSet()) {
+      if (stats.getNumNulls() == 0) {
+        if (values.contains(null) && values.size() == 1) return 
BLOCK_CANNOT_MATCH;
+      } else {
+        if (values.contains(null)) return BLOCK_MIGHT_MATCH;
+      }
+    }
+
     // drop if all the element in value < min || all the element in value > max
-    return allElementCanBeDropped(stats, values, meta);
+    if (stats.compareMinToValue(getMaxOrMin(true, values)) <= 0 &&
+      stats.compareMaxToValue(getMaxOrMin(false, values)) >= 0) {
+      return BLOCK_MIGHT_MATCH;
+    }
+    else {
+      return BLOCK_CANNOT_MATCH;
+    }
   }
 
-  private <T extends Comparable<T>> Boolean 
allElementCanBeDropped(Statistics<T> stats, Set<T> values, ColumnChunkMetaData 
meta) {
-    for (T value : values) {
-      if (value != null) {
-        if (stats.compareMinToValue(value) <= 0 && 
stats.compareMaxToValue(value) >= 0)
-          return false;
-      } else {
-        // numNulls is not set. We don't know anything about the nulls in this 
chunk
-        if (!stats.isNumNullsSet()) {
-          return false;
-        }
-        if (hasNulls(meta)) {
-          return false;
-        }
+  private <T extends Comparable<T>> T getMaxOrMin(boolean isMax, Set<T> 
values) {

Review comment:
       I don't really like to have a boolean flag for min/max. I also think it 
would be faster if the min/max values would be searched in the same iteration. 
Also, it would be nice if we wouldn't have to copy-paste this method twice. 
What do you think about the following design?
   
   Having a separate class e.g. `MinMax` that have two `T` fields: `min` and 
`max`. This class can be created by passing an `Iterable<T>` and a 
`PrimitiveComparator` arguments. At instantiation `min` and `max` would be 
initialized so directly after creating min and max can be retrieved.

##########
File path: 
parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
##########
@@ -291,32 +291,29 @@ boolean isNullPage(int pageIndex) {
     @Override
     public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(In<T> in) {
       Set<T> values = in.getValues();
-      TreeSet<T> myTreeSet = new TreeSet<>();
-      IntSet matchingIndexes1 = new IntOpenHashSet();  // for null
+      IntSet matchingIndexesForNull = new IntOpenHashSet();  // for null
       Iterator<T> it = values.iterator();
       while(it.hasNext()) {
         T value = it.next();
-        if (value != null) {
-          myTreeSet.add(value);
-        } else {
+        if (value == null) {
           if (nullCounts == null) {
             // Searching for nulls so if we don't have null related statistics 
we have to return all pages
             return IndexIterator.all(getPageCount());
           } else {
             for (int i = 0; i < nullCounts.length; i++) {
               if (nullCounts[i] > 0) {
-                matchingIndexes1.add(i);
+                matchingIndexesForNull.add(i);
               }
             }
           }
         }
       }
 
-      IntSet matchingIndexes2 = new IntOpenHashSet();
-      IntSet matchingIndexes3 = new IntOpenHashSet();
+      IntSet matchingIndexesLessThanMax = new IntOpenHashSet();
+      IntSet matchingIndexesLargerThanMin = new IntOpenHashSet();

Review comment:
       I would suggest using `Greater` instead of `Larger`. That's the usual 
naming hence we have LT (LessThan) and GT (GreaterThan).

##########
File path: 
parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
##########
@@ -287,6 +288,79 @@ boolean isNullPage(int pageIndex) {
           pageIndex -> nullCounts[pageIndex] > 0 || 
matchingIndexes.contains(pageIndex));
     }
 
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(In<T> in) {
+      Set<T> values = in.getValues();
+      IntSet matchingIndexesForNull = new IntOpenHashSet();  // for null
+      Iterator<T> it = values.iterator();
+      while(it.hasNext()) {
+        T value = it.next();
+        if (value == null) {
+          if (nullCounts == null) {
+            // Searching for nulls so if we don't have null related statistics 
we have to return all pages
+            return IndexIterator.all(getPageCount());
+          } else {
+            for (int i = 0; i < nullCounts.length; i++) {
+              if (nullCounts[i] > 0) {
+                matchingIndexesForNull.add(i);
+              }
+            }
+          }
+        }
+      }
+
+      IntSet matchingIndexesLessThanMax = new IntOpenHashSet();
+      IntSet matchingIndexesLargerThanMin = new IntOpenHashSet();
+
+      T min = getMaxOrMin(false, values);
+      T max = getMaxOrMin(true, values);
+
+      // We don't want to iterate through each of the values in the IN set to 
compare,
+      // because the size of the IN set might be very large. Instead, we want 
to only
+      // compare the max and min value of the IN set to see if the page might 
contain the
+      // values in the IN set.
+      // If the values in a page are <= the max value in the IN set,

Review comment:
       This way it sounds like all the value in a page shall be <= than max and 
>= than min and it is not true. What about `If there might be values in a page 
that are <= ...`?

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
##########
@@ -146,6 +147,33 @@ public void testAllFilter() throws Exception {
     assertEquals(new ArrayList<Group>(), found);
   }
 
+  @Test
+  public void testInFilter() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    HashSet<Binary> nameSet = new HashSet<>();
+    nameSet.add(Binary.fromString("thing2"));
+    nameSet.add(Binary.fromString("thing1"));
+    for (int i = 100; i < 200; i++) {
+      nameSet.add(Binary.fromString("p" + i));
+    }
+    FilterPredicate pred = in(name, nameSet);
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, 
FilterCompat.get(pred));
+
+    List<String> expectedNames = new ArrayList<>();
+    expectedNames.add("thing1");
+    expectedNames.add("thing2");
+    for (int i = 100; i < 200; i++) {
+      expectedNames.add("p" + i);
+    }
+
+    assertEquals(expectedNames.get(0), 
((Group)(found.get(0))).getString("name", 0));
+    assertEquals(expectedNames.get(1), 
((Group)(found.get(1))).getString("name", 0));
+    for (int i = 2; i < 102; i++) {
+      assertEquals(expectedNames.get(i), 
((Group)(found.get(i))).getString("name", 0));
+    }

Review comment:
       This is not correct this way. You should validate that all the values 
returned by the reader fulfills the filter and there are no values left out. 
So, `"thing1"`, `"thing2"` and from `"p100"` to `"p199"` and nothing else.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
##########
@@ -186,26 +186,36 @@ private boolean hasNulls(ColumnChunkMetaData column) {
       return BLOCK_MIGHT_MATCH;
     }
 
+    if (stats.isNumNullsSet()) {
+      if (stats.getNumNulls() == 0) {
+        if (values.contains(null) && values.size() == 1) return 
BLOCK_CANNOT_MATCH;
+      } else {
+        if (values.contains(null)) return BLOCK_MIGHT_MATCH;
+      }
+    }
+
     // drop if all the element in value < min || all the element in value > max
-    return allElementCanBeDropped(stats, values, meta);
+    if (stats.compareMinToValue(getMaxOrMin(true, values)) <= 0 &&
+      stats.compareMaxToValue(getMaxOrMin(false, values)) >= 0) {
+      return BLOCK_MIGHT_MATCH;
+    }
+    else {
+      return BLOCK_CANNOT_MATCH;
+    }
   }
 
-  private <T extends Comparable<T>> Boolean 
allElementCanBeDropped(Statistics<T> stats, Set<T> values, ColumnChunkMetaData 
meta) {
-    for (T value : values) {
-      if (value != null) {
-        if (stats.compareMinToValue(value) <= 0 && 
stats.compareMaxToValue(value) >= 0)
-          return false;
-      } else {
-        // numNulls is not set. We don't know anything about the nulls in this 
chunk
-        if (!stats.isNumNullsSet()) {
-          return false;
-        }
-        if (hasNulls(meta)) {
-          return false;
-        }
+  private <T extends Comparable<T>> T getMaxOrMin(boolean isMax, Set<T> 
values) {
+    T res = null;
+    for (T element : values) {
+      if (res == null) {
+        res = element;
+      } else if (isMax && res != null && element != null && 
res.compareTo(element) < 0) {

Review comment:
       Similarly to the other min/max search you need the proper 
`PrimitiveComparator` to compare. You can get it by calling 
`meta.getPrimitiveType().comparator()` in the `visit` method.

##########
File path: 
parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
##########
@@ -326,12 +323,27 @@ boolean isNullPage(int pageIndex) {
       // and >= the min value in the IN set, then the page might contain
       // the values in the IN set.
       getBoundaryOrder().ltEq(createValueComparator(max))
-        .forEachRemaining((int index) -> matchingIndexes2.add(index));
+        .forEachRemaining((int index) -> 
matchingIndexesLessThanMax.add(index));
       getBoundaryOrder().gtEq(createValueComparator(min))
-        .forEachRemaining((int index) -> matchingIndexes3.add(index));
-      matchingIndexes2.retainAll(matchingIndexes3);
-      matchingIndexes2.addAll(matchingIndexes1);  // add the matching null 
pages
-      return IndexIterator.filter(getPageCount(), pageIndex -> 
matchingIndexes2.contains(pageIndex));
+        .forEachRemaining((int index) -> 
matchingIndexesLargerThanMin.add(index));
+      matchingIndexesLessThanMax.retainAll(matchingIndexesLargerThanMin);
+      IntSet matchingIndex = matchingIndexesLessThanMax;
+      matchingIndex.addAll(matchingIndexesForNull);  // add the matching null 
pages
+      return IndexIterator.filter(getPageCount(), pageIndex -> 
matchingIndex.contains(pageIndex));
+    }
+
+    private <T extends Comparable<T>> T getMaxOrMin(boolean isMax, Set<T> 
values) {
+      T res = null;
+      for (T element : values) {
+        if (res == null) {
+          res = element;
+        } else if (isMax && res != null && element != null && 
res.compareTo(element) < 0) {

Review comment:
       Sorry, just realized that the `compareTo` method of the value won't work 
properly in all cases. For example comparing two `Binary` values depends on the 
logical type. The comparison is different for decimals and strings. There are 
also different comparisons for signed and unsigned integers.
   So, you need to use the proper `PrimitiveComparator` instance for the type. 
Here in `ColumnIndexBase` you already have an instance for that: `comparator`.

##########
File path: 
parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
##########
@@ -1,14 +1,14 @@
-/* 

Review comment:
       `TestColumnIndexFiltering` is another test class different from 
`TestColumnIndexFilter`. The first one is a higher level test. It would be nice 
if you could add some tests there as well.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
##########
@@ -186,26 +186,36 @@ private boolean hasNulls(ColumnChunkMetaData column) {
       return BLOCK_MIGHT_MATCH;
     }
 
+    if (stats.isNumNullsSet()) {
+      if (stats.getNumNulls() == 0) {
+        if (values.contains(null) && values.size() == 1) return 
BLOCK_CANNOT_MATCH;
+      } else {
+        if (values.contains(null)) return BLOCK_MIGHT_MATCH;
+      }
+    }
+
     // drop if all the element in value < min || all the element in value > max
-    return allElementCanBeDropped(stats, values, meta);
+    if (stats.compareMinToValue(getMaxOrMin(true, values)) <= 0 &&
+      stats.compareMaxToValue(getMaxOrMin(false, values)) >= 0) {
+      return BLOCK_MIGHT_MATCH;
+    }
+    else {

Review comment:
       Please merge lines.

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
##########
@@ -146,6 +147,33 @@ public void testAllFilter() throws Exception {
     assertEquals(new ArrayList<Group>(), found);
   }
 
+  @Test
+  public void testInFilter() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    HashSet<Binary> nameSet = new HashSet<>();
+    nameSet.add(Binary.fromString("thing2"));
+    nameSet.add(Binary.fromString("thing1"));
+    for (int i = 100; i < 200; i++) {
+      nameSet.add(Binary.fromString("p" + i));
+    }

Review comment:
       I think you should also add some value to the set that are not in the 
file.

##########
File path: 
parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
##########
@@ -293,8 +290,48 @@ boolean isNullPage(int pageIndex) {
 
     @Override
     public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(In<T> in) {
-      IntSet indexes = getMatchingIndexes(in);
-      return IndexIterator.filter(getPageCount(), indexes::contains);
+      Set<T> values = in.getValues();
+      TreeSet<T> myTreeSet = new TreeSet<>();
+      IntSet matchingIndexes1 = new IntOpenHashSet();  // for null
+      Iterator<T> it = values.iterator();
+      while(it.hasNext()) {
+        T value = it.next();
+        if (value != null) {
+          myTreeSet.add(value);
+        } else {
+          if (nullCounts == null) {
+            // Searching for nulls so if we don't have null related statistics 
we have to return all pages
+            return IndexIterator.all(getPageCount());
+          } else {
+            for (int i = 0; i < nullCounts.length; i++) {
+              if (nullCounts[i] > 0) {
+                matchingIndexes1.add(i);
+              }
+            }
+          }
+        }
+      }
+
+      IntSet matchingIndexes2 = new IntOpenHashSet();

Review comment:
       The return types of `StatisticsFilter` and `ColumnIndexBuilder` are 
different because the first one simply returns whether the related row group 
can be dropped or not while the second one returns the indexes of the matching 
pages.
   
   For `noIn` I agree. However there is an edge case you can figure out 
something when the min and max values are the same. In this case all the values 
shall be the same in the row group / page so you can say something in the case 
of there is only one element in `notIn`. It sounds a very rare scenario so I am 
fine returning "might match".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> FilterApi support In predicate
> ------------------------------
>
>                 Key: PARQUET-1968
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1968
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>    Affects Versions: 1.12.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> FilterApi should support native In predicate.
> Spark:
> https://github.com/apache/spark/blob/d6a68e0b67ff7de58073c176dd097070e88ac831/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L600-L605
> Impala:
> https://issues.apache.org/jira/browse/IMPALA-3654



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to