HIVE-10920 : LLAP: elevator reads some useless data even if all RGs are eliminated by SARG (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fbb9be8d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fbb9be8d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fbb9be8d Branch: refs/heads/llap Commit: fbb9be8d8c641bb845229c121b804278c5685dee Parents: a73a910 Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Jun 4 11:29:48 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Jun 4 11:29:48 2015 -0700 ---------------------------------------------------------------------- .../llap/io/encoded/OrcEncodedDataReader.java | 59 ++++++++++++++------ .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 20 +++---- 2 files changed, 51 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fbb9be8d/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index e107378..03a72a2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -192,7 +192,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } // Now, apply SARG if any; w/o sarg, this will just initialize readState. - determineRgsToRead(globalIncludes, stride, stripeMetadatas); + boolean hasData = determineRgsToRead(globalIncludes, stride, stripeMetadatas); + if (!hasData) { + consumer.setDone(); + recordReaderTime(startTime); + return null; // No data to read. + } } catch (Throwable t) { cleanupReaders(); consumer.setError(t); @@ -262,6 +267,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> + stripe.getOffset() + ", " + stripe.getLength()); } colRgs = readState[stripeIxMod]; + assert colRgs.length > 0; + // We assume that NO_RGS value is only set from SARG filter and for all columns; + // intermediate changes for individual columns will unset values in the array. + if (colRgs[0] == SargApplier.READ_NO_RGS) continue; // 6.1. Determine the columns to read (usually the same as requested). if (cache == null || cols == null || cols.size() == colRgs.length) { @@ -587,7 +596,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> * SARG is applied, and readState is populated for each stripe accordingly. * @param stripes All stripes in the file (field state is used to determine stripes to read). */ - private void determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, + private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, ArrayList<OrcStripeMetadata> metadata) throws IOException { SargApplier sargApp = null; if (sarg != null && rowIndexStride != 0) { @@ -596,6 +605,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> columnNames, types, globalIncludes, fileMetadata.isOriginalFormat()); sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, globalIncludes.length); } + boolean hasAnyData = false; // readState should have been initialized by this time with an empty array. for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { int stripeIx = stripeIxMod + stripeIxFrom; @@ -603,36 +613,48 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> int rgCount = getRgCount(stripe, rowIndexStride); boolean[] rgsToRead = null; if (sargApp != null) { - rgsToRead = sargApp.pickRowGroups(stripe, metadata.get(stripeIxMod).getRowIndexes()); + rgsToRead = sargApp.pickRowGroups(stripe, metadata.get(stripeIxMod).getRowIndexes(), true); } + boolean isNone = rgsToRead == SargApplier.READ_NO_RGS, + isAll = rgsToRead == SargApplier.READ_ALL_RGS; + hasAnyData = hasAnyData || !isNone; if (DebugUtils.isTraceOrcEnabled()) { - if (rgsToRead != null ) { + if (isNone) { + LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + stripeIx); + } else if (!isAll) { LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": " + DebugUtils.toString(rgsToRead)); } else { LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx); } } - assert rgsToRead == null || rgsToRead.length == rgCount; + assert isAll || isNone || rgsToRead.length == rgCount; readState[stripeIxMod] = new boolean[columnIds.size()][]; for (int j = 0; j < columnIds.size(); ++j) { - readState[stripeIxMod][j] = (rgsToRead == null) ? null : + readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead : Arrays.copyOf(rgsToRead, rgsToRead.length); } - int count = 0; - if (rgsToRead != null) { - for (boolean b : rgsToRead) { - if (b) - count++; - } - } else { - count = rgCount; + adjustRgMetric(rgCount, rgsToRead, isNone, isAll); + } + return hasAnyData; + } + + private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone, + boolean isAll) { + int count = 0; + if (!isAll) { + for (boolean b : rgsToRead) { + if (b) + count++; } - counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count); + } else if (!isNone) { + count = rgCount; } + counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count); } + private int getRgCount(StripeInformation stripe, int rowIndexStride) { return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); } @@ -713,7 +735,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) { boolean[] readMask = cols[colIxMod]; // Check if RG is eliminated by SARG - if (readMask != null && (readMask.length <= rgIx || !readMask[rgIx])) continue; + if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS + && (readMask.length <= rgIx || !readMask[rgIx]))) continue; key.colIx = columnIds.get(colIxMod); StreamBuffer[] cached = cache.get(key); if (cached == null) { @@ -722,7 +745,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } col.setAllStreams(colIxMod, key.colIx, cached); hasAnyCached = true; - if (readMask == null) { + if (readMask == SargApplier.READ_ALL_RGS) { // We were going to read all RGs, but some were in cache, allocate the mask. cols[colIxMod] = readMask = new boolean[totalRgCount]; Arrays.fill(readMask, true); @@ -784,4 +807,4 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> public void setError(Throwable t) { consumer.setError(t); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/fbb9be8d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 65b79c5..bd7fbd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -663,6 +663,9 @@ public class RecordReaderImpl implements RecordReader { } public static class SargApplier { + public final static boolean[] READ_ALL_RGS = null; + public final static boolean[] READ_NO_RGS = new boolean[0]; + private final SearchArgument sarg; private final List<PredicateLeaf> sargLeaves; private final int[] filterColumns; @@ -695,12 +698,13 @@ public class RecordReaderImpl implements RecordReader { * row groups must be read. * @throws IOException */ - public boolean[] pickRowGroups( - StripeInformation stripe, OrcProto.RowIndex[] indexes) throws IOException { + public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes, + boolean returnNone) throws IOException { long rowsInStripe = stripe.getNumberOfRows(); int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc? TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; + boolean hasSelected = false, hasSkipped = false; for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) { for (int pred = 0; pred < leafValues.length; ++pred) { if (filterColumns[pred] != -1) { @@ -722,6 +726,8 @@ public class RecordReaderImpl implements RecordReader { } } result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); + hasSelected = hasSelected || result[rowGroup]; + hasSkipped = hasSkipped || (!result[rowGroup]); if (LOG.isDebugEnabled()) { LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " + (rowIndexStride * (rowGroup + 1) - 1) + " is " + @@ -729,13 +735,7 @@ public class RecordReaderImpl implements RecordReader { } } - // if we found something to skip, use the array. otherwise, return null. - for (boolean b : result) { - if (!b) { - return result; - } - } - return null; + return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS; } } @@ -752,7 +752,7 @@ public class RecordReaderImpl implements RecordReader { return null; } readRowIndex(currentStripe, included, sargApp.sargColumns); - return sargApp.pickRowGroups(stripes.get(currentStripe), indexes); + return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, false); } private void clearStreams() throws IOException {