HIVE-10920 : LLAP: elevator reads some useless data even if all RGs are 
eliminated by SARG (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fbb9be8d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fbb9be8d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fbb9be8d

Branch: refs/heads/llap
Commit: fbb9be8d8c641bb845229c121b804278c5685dee
Parents: a73a910
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Thu Jun 4 11:29:48 2015 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Thu Jun 4 11:29:48 2015 -0700

----------------------------------------------------------------------
 .../llap/io/encoded/OrcEncodedDataReader.java   | 59 ++++++++++++++------
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 20 +++----
 2 files changed, 51 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fbb9be8d/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index e107378..03a72a2 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -192,7 +192,12 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       }
 
       // Now, apply SARG if any; w/o sarg, this will just initialize readState.
-      determineRgsToRead(globalIncludes, stride, stripeMetadatas);
+      boolean hasData = determineRgsToRead(globalIncludes, stride, 
stripeMetadatas);
+      if (!hasData) {
+        consumer.setDone();
+        recordReaderTime(startTime);
+        return null; // No data to read.
+      }
     } catch (Throwable t) {
       cleanupReaders();
       consumer.setError(t);
@@ -262,6 +267,10 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
               + stripe.getOffset() + ", " + stripe.getLength());
         }
         colRgs = readState[stripeIxMod];
+        assert colRgs.length > 0;
+        // We assume that NO_RGS value is only set from SARG filter and for 
all columns;
+        // intermediate changes for individual columns will unset values in 
the array.
+        if (colRgs[0] == SargApplier.READ_NO_RGS) continue;
 
         // 6.1. Determine the columns to read (usually the same as requested).
         if (cache == null || cols == null || cols.size() == colRgs.length) {
@@ -587,7 +596,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    * SARG is applied, and readState is populated for each stripe accordingly.
    * @param stripes All stripes in the file (field state is used to determine 
stripes to read).
    */
-  private void determineRgsToRead(boolean[] globalIncludes, int rowIndexStride,
+  private boolean determineRgsToRead(boolean[] globalIncludes, int 
rowIndexStride,
       ArrayList<OrcStripeMetadata> metadata) throws IOException {
     SargApplier sargApp = null;
     if (sarg != null && rowIndexStride != 0) {
@@ -596,6 +605,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
           columnNames, types, globalIncludes, fileMetadata.isOriginalFormat());
       sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, 
globalIncludes.length);
     }
+    boolean hasAnyData = false;
     // readState should have been initialized by this time with an empty array.
     for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
       int stripeIx = stripeIxMod + stripeIxFrom;
@@ -603,36 +613,48 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       int rgCount = getRgCount(stripe, rowIndexStride);
       boolean[] rgsToRead = null;
       if (sargApp != null) {
-        rgsToRead = sargApp.pickRowGroups(stripe, 
metadata.get(stripeIxMod).getRowIndexes());
+        rgsToRead = sargApp.pickRowGroups(stripe, 
metadata.get(stripeIxMod).getRowIndexes(), true);
       }
+      boolean isNone = rgsToRead == SargApplier.READ_NO_RGS,
+          isAll = rgsToRead == SargApplier.READ_ALL_RGS;
+      hasAnyData = hasAnyData || !isNone;
       if (DebugUtils.isTraceOrcEnabled()) {
-        if (rgsToRead != null ) {
+        if (isNone) {
+          LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + 
stripeIx);
+        } else if (!isAll) {
           LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": "
               + DebugUtils.toString(rgsToRead));
         } else {
           LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " 
+ stripeIx);
         }
       }
-      assert rgsToRead == null || rgsToRead.length == rgCount;
+      assert isAll || isNone || rgsToRead.length == rgCount;
       readState[stripeIxMod] = new boolean[columnIds.size()][];
       for (int j = 0; j < columnIds.size(); ++j) {
-        readState[stripeIxMod][j] = (rgsToRead == null) ? null :
+        readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead :
           Arrays.copyOf(rgsToRead, rgsToRead.length);
       }
 
-      int count = 0;
-      if (rgsToRead != null) {
-        for (boolean b : rgsToRead) {
-          if (b)
-            count++;
-        }
-      } else {
-        count = rgCount;
+      adjustRgMetric(rgCount, rgsToRead, isNone, isAll);
+    }
+    return hasAnyData;
+  }
+
+  private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone,
+      boolean isAll) {
+    int count = 0;
+    if (!isAll) {
+      for (boolean b : rgsToRead) {
+        if (b)
+          count++;
       }
-      counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, 
count);
+    } else if (!isNone) {
+      count = rgCount;
     }
+    counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, 
count);
   }
 
+
   private int getRgCount(StripeInformation stripe, int rowIndexStride) {
     return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
   }
@@ -713,7 +735,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
           for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
             boolean[] readMask = cols[colIxMod];
             // Check if RG is eliminated by SARG
-            if (readMask != null && (readMask.length <= rgIx || 
!readMask[rgIx])) continue;
+            if ((readMask == SargApplier.READ_NO_RGS) || (readMask != 
SargApplier.READ_ALL_RGS
+                && (readMask.length <= rgIx || !readMask[rgIx]))) continue;
             key.colIx = columnIds.get(colIxMod);
             StreamBuffer[] cached = cache.get(key);
             if (cached == null) {
@@ -722,7 +745,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
             }
             col.setAllStreams(colIxMod, key.colIx, cached);
             hasAnyCached = true;
-            if (readMask == null) {
+            if (readMask == SargApplier.READ_ALL_RGS) {
               // We were going to read all RGs, but some were in cache, 
allocate the mask.
               cols[colIxMod] = readMask = new boolean[totalRgCount];
               Arrays.fill(readMask, true);
@@ -784,4 +807,4 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   public void setError(Throwable t) {
     consumer.setError(t);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fbb9be8d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 65b79c5..bd7fbd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -663,6 +663,9 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   public static class SargApplier {
+    public final static boolean[] READ_ALL_RGS = null;
+    public final static boolean[] READ_NO_RGS = new boolean[0];
+
     private final SearchArgument sarg;
     private final List<PredicateLeaf> sargLeaves;
     private final int[] filterColumns;
@@ -695,12 +698,13 @@ public class RecordReaderImpl implements RecordReader {
      * row groups must be read.
      * @throws IOException
      */
-    public boolean[] pickRowGroups(
-        StripeInformation stripe, OrcProto.RowIndex[] indexes) throws 
IOException {
+    public boolean[] pickRowGroups(StripeInformation stripe, 
OrcProto.RowIndex[] indexes,
+        boolean returnNone) throws IOException {
       long rowsInStripe = stripe.getNumberOfRows();
       int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / 
rowIndexStride);
       boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
       TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+      boolean hasSelected = false, hasSkipped = false;
       for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
         for (int pred = 0; pred < leafValues.length; ++pred) {
           if (filterColumns[pred] != -1) {
@@ -722,6 +726,8 @@ public class RecordReaderImpl implements RecordReader {
           }
         }
         result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+        hasSelected = hasSelected || result[rowGroup];
+        hasSkipped = hasSkipped || (!result[rowGroup]);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
               (rowIndexStride * (rowGroup + 1) - 1) + " is " +
@@ -729,13 +735,7 @@ public class RecordReaderImpl implements RecordReader {
         }
       }
 
-      // if we found something to skip, use the array. otherwise, return null.
-      for (boolean b : result) {
-        if (!b) {
-          return result;
-        }
-      }
-      return null;
+      return hasSkipped ? ((hasSelected || !returnNone) ? result : 
READ_NO_RGS) : READ_ALL_RGS;
     }
   }
 
@@ -752,7 +752,7 @@ public class RecordReaderImpl implements RecordReader {
       return null;
     }
     readRowIndex(currentStripe, included, sargApp.sargColumns);
-    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes);
+    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, false);
   }
 
   private void clearStreams() throws IOException {

Reply via email to