Repository: hive
Updated Branches:
  refs/heads/master 6fb647f32 -> 16f68e3e6


HIVE-16778 : LLAP IO: better refcount management (Sergey Shelukhin, reviewed by 
Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/16f68e3e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/16f68e3e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/16f68e3e

Branch: refs/heads/master
Commit: 16f68e3e67def86bb16789d146a9b68bc7863894
Parents: 6fb647f
Author: sergey <[email protected]>
Authored: Thu Jun 8 14:24:48 2017 -0700
Committer: sergey <[email protected]>
Committed: Thu Jun 8 14:24:48 2017 -0700

----------------------------------------------------------------------
 .../llap/io/encoded/OrcEncodedDataReader.java   |  45 +--
 .../hive/ql/io/orc/encoded/EncodedReader.java   |   3 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 339 +++++++++++--------
 3 files changed, 213 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/16f68e3e/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 7081140..abf8cd3 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -155,10 +155,10 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private Object fileKey;
   private FileSystem fs;
   /**
-   * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of 
rg-s that need to be
-   * read. Contains only stripes that are read, and only columns included. 
null => read all RGs.
+   * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that 
need to be read.
+   * Contains only stripes that are read, and only columns included. null => 
read all RGs.
    */
-  private boolean[][][] readState;
+  private boolean[][] stripeRgs;
   private volatile boolean isStopped = false;
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
@@ -268,12 +268,13 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       return null;
     }
 
-    if (readState.length == 0) {
+    if (stripeRgs.length == 0) {
       consumer.setDone();
       recordReaderTime(startTime);
       return null; // No data to read.
     }
-    counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + 
readState.length);
+    counters.setDesc(QueryFragmentCounters.Desc.STRIPES,
+        stripeIxFrom + "," + stripeRgs.length);
 
     // 3. Apply SARG if needed, and otherwise determine what RGs to read.
     int stride = fileMetadata.getRowIndexStride();
@@ -334,13 +335,13 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // TODO: I/O threadpool could be here - one thread per stripe; for now, 
linear.
     boolean hasFileId = this.fileKey != null;
     OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, -1, 0) : null;
-    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+    for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) {
       if (processStop()) {
         recordReaderTime(startTime);
         return null;
       }
       int stripeIx = stripeIxFrom + stripeIxMod;
-      boolean[][] colRgs = null;
+      boolean[] rgs = null;
       OrcStripeMetadata stripeMetadata = null;
       StripeInformation stripe;
       try {
@@ -348,16 +349,15 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
 
         LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, 
stripe.getOffset(),
             stripe.getLength());
-        colRgs = readState[stripeIxMod];
+        rgs = stripeRgs[stripeIxMod];
         if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-          LlapIoImpl.ORC_LOGGER.trace("readState[{}]: {}", stripeIxMod, 
Arrays.toString(colRgs));
+          LlapIoImpl.ORC_LOGGER.trace("readState[{}]: {}", stripeIxMod, 
Arrays.toString(rgs));
         }
         // We assume that NO_RGS value is only set from SARG filter and for 
all columns;
         // intermediate changes for individual columns will unset values in 
the array.
         // Skip this case for 0-column read. We could probably special-case it 
just like we do
         // in EncodedReaderImpl, but for now it's not that important.
-        if (colRgs.length > 0 && colRgs[0] ==
-            RecordReaderImpl.SargApplier.READ_NO_RGS) continue;
+        if (rgs == RecordReaderImpl.SargApplier.READ_NO_RGS) continue;
 
         // 6.2. Ensure we have stripe metadata. We might have read it before 
for RG filtering.
         boolean isFoundInCache = false;
@@ -421,7 +421,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         // data it receives for one stripe. We could probably interrupt it, if 
it checked that.
         stripeReader.readEncodedColumns(stripeIx, stripe, 
stripeMetadata.getRowIndexes(),
             stripeMetadata.getEncodings(), stripeMetadata.getStreams(), 
globalIncludes,
-            colRgs, consumer);
+            rgs, consumer);
       } catch (Throwable t) {
         consumer.setError(t);
         cleanupReaders();
@@ -617,10 +617,10 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    */
   private ArrayList<OrcStripeMetadata> readStripesMetadata(
       boolean[] globalInc, boolean[] sargColumns) throws IOException {
-    ArrayList<OrcStripeMetadata> result = new 
ArrayList<OrcStripeMetadata>(readState.length);
+    ArrayList<OrcStripeMetadata> result = new 
ArrayList<OrcStripeMetadata>(stripeRgs.length);
     boolean hasFileId = this.fileKey != null;
     OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null;
-    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+    for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) {
       OrcStripeMetadata value = null;
       int stripeIx = stripeIxMod + stripeIxFrom;
       if (hasFileId && metadataCache != null) {
@@ -712,8 +712,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
           OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum()));
     }
     boolean hasAnyData = false;
-    // readState should have been initialized by this time with an empty array.
-    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+    // stripeRgs should have been initialized by this time with an empty array.
+    for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) {
       int stripeIx = stripeIxMod + stripeIxFrom;
       StripeInformation stripe = fileMetadata.getStripes().get(stripeIx);
       int rgCount = getRgCount(stripe, rowIndexStride);
@@ -739,17 +739,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         }
       }
       assert isAll || isNone || rgsToRead.length == rgCount;
-      int fileIncludesCount = 0;
-      // TODO: hacky for now - skip the root 0-s column.
-      //        We don't need separate readState w/o HL cache, should get rid 
of that instead.
-      for (int includeIx = 1; includeIx < globalIncludes.length; ++includeIx) {
-        fileIncludesCount += (globalIncludes[includeIx] ? 1 : 0);
-      }
-      readState[stripeIxMod] = new boolean[fileIncludesCount][];
-      for (int includeIx = 0; includeIx < fileIncludesCount; ++includeIx) {
-        readState[stripeIxMod][includeIx] = (isAll || isNone) ? rgsToRead :
+      stripeRgs[stripeIxMod] = (isAll || isNone) ? rgsToRead :
           Arrays.copyOf(rgsToRead, rgsToRead.length);
-      }
       adjustRgMetric(rgCount, rgsToRead, isNone, isAll);
     }
     return hasAnyData;
@@ -820,7 +811,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       LlapIoImpl.ORC_LOGGER.trace("Including stripes until {} (end of file); 
{} stripes",
           stripeIx, (stripeIxTo - stripeIxFrom));
     }
-    readState = new boolean[stripeIxTo - stripeIxFrom][][];
+    stripeRgs = new boolean[stripeIxTo - stripeIxFrom][];
   }
 
   private class DataWrapperForOrc implements DataReader, DataCache {

http://git-wip-us.apache.org/repos/asf/hive/blob/16f68e3e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index ea9904a..7540e72 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -40,8 +40,7 @@ public interface EncodedReader {
    */
   void readEncodedColumns(int stripeIx, StripeInformation stripe,
       OrcProto.RowIndex[] index, List<OrcProto.ColumnEncoding> encodings,
-      List<OrcProto.Stream> streams,
-      boolean[] included, boolean[][] colRgs,
+      List<OrcProto.Stream> streams, boolean[] included, boolean[] rgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/16f68e3e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 6cd85d2..248feae 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -223,7 +222,7 @@ class EncodedReaderImpl implements EncodedReader {
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
       OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
-      List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs,
+      List<OrcProto.Stream> streamList, boolean[] included, boolean[] rgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     // Note: for now we don't have to setError here, caller will setError if 
we throw.
     // We are also not supposed to call setDone, since we are only part of the 
operation.
@@ -252,7 +251,6 @@ class EncodedReaderImpl implements EncodedReader {
     boolean isCompressed = (codec != null);
     CreateHelper listToRead = new CreateHelper();
     boolean hasIndexOnlyCols = false;
-    boolean[] includedRgs = null; // Will always be the same for all cols at 
the moment.
     for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
       int colIx = stream.getColumn();
@@ -270,7 +268,6 @@ class EncodedReaderImpl implements EncodedReader {
       }
       ColumnReadContext ctx = colCtxs[colIx];
       assert ctx != null;
-      includedRgs = colRgs[ctx.includedIx];
       int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
           types.get(colIx).getKind(), streamKind, isCompressed, 
hasNull[colIx]);
       ctx.addStream(offset, stream, indexIx);
@@ -278,13 +275,13 @@ class EncodedReaderImpl implements EncodedReader {
         LOG.trace("Adding stream for column " + colIx + ": " + streamKind + " 
at " + offset
             + ", " + length + ", index position " + indexIx);
       }
-      if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, 
encodings.get(colIx))) {
+      if (rgs == null || RecordReaderUtils.isDictionary(streamKind, 
encodings.get(colIx))) {
         RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, 
true);
         if (isTracingEnabled) {
           LOG.trace("Will read whole stream " + streamKind + "; added to " + 
listToRead.getTail());
         }
       } else {
-        RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
+        RecordReaderUtils.addRgFilteredStreamToRanges(stream, rgs,
             codec != null, indexes[colIx], encodings.get(colIx), 
types.get(colIx),
             bufferSize, hasNull[colIx], offset, length, listToRead, true);
       }
@@ -295,7 +292,7 @@ class EncodedReaderImpl implements EncodedReader {
     if (listToRead.get() == null) {
       // No data to read for this stripe. Check if we have some included 
index-only columns.
       // TODO: there may be a bug here. Could there be partial RG filtering on 
index-only column?
-      if (hasIndexOnlyCols && (includedRgs == null)) {
+      if (hasIndexOnlyCols && (rgs == null)) {
         OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
         ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, 
included.length);
         consumer.consumeData(ecb);
@@ -329,18 +326,29 @@ class EncodedReaderImpl implements EncodedReader {
     //       can be freed in advance, we remove it from the map.
     IdentityHashMap<ByteBuffer, Boolean> toRelease = null;
     if (!isAllInCache.value) {
-      if (!isDataReaderOpen) {
-        this.dataReader.open();
-        isDataReaderOpen = true;
-      }
-      dataReader.readFileData(toRead.next, stripeOffset, 
cacheWrapper.getAllocator().isDirectAlloc());
-      toRelease = new IdentityHashMap<>();
-      DiskRangeList drl = toRead.next;
-      while (drl != null) {
-        if (drl instanceof BufferChunk) {
-          toRelease.put(drl.getData(), true);
+      boolean hasError = true;
+      try {
+        if (!isDataReaderOpen) {
+          this.dataReader.open();
+          isDataReaderOpen = true;
+        }
+        dataReader.readFileData(toRead.next, stripeOffset,
+            cacheWrapper.getAllocator().isDirectAlloc());
+        toRelease = new IdentityHashMap<>();
+        DiskRangeList drl = toRead.next;
+        while (drl != null) {
+          if (drl instanceof BufferChunk) {
+            toRelease.put(drl.getData(), true);
+          }
+          drl = drl.next;
+        }
+        hasError = false;
+      } finally {
+        // The FS can be closed from under us if the task is interrupted. 
Release cache buffers.
+        // We are assuming here that toRelease will not be present in such 
cases.
+        if (hasError) {
+          releaseInitialRefcounts(toRead.next);
         }
-        drl = drl.next;
       }
     }
 
@@ -350,157 +358,198 @@ class EncodedReaderImpl implements EncodedReader {
     //    either cache buffers, or buffers allocated by us and not cached (if 
we are only reading
     //    parts of the data for some ranges and don't want to cache it). Both 
are represented by
     //    CacheChunks, so the list is just CacheChunk-s from that point on.
-    DiskRangeList iter = toRead.next;  // Keep "toRead" list for future use, 
don't extract().
+    DiskRangeList iter = toRead.next;
     if (codec == null) {
-      for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
-        ColumnReadContext ctx = colCtxs[colIx];
-        if (ctx == null) continue; // This column is not included.
-        for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
-          StreamContext sctx = ctx.streams[streamIx];
-          DiskRangeList newIter = preReadUncompressedStream(
-              stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
-          if (newIter != null) {
-            iter = newIter;
+      boolean hasError = true;
+      try {
+        for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
+          ColumnReadContext ctx = colCtxs[colIx];
+          if (ctx == null) continue; // This column is not included.
+          for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+            StreamContext sctx = ctx.streams[streamIx];
+            DiskRangeList newIter = preReadUncompressedStream(
+                stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
+            if (newIter != null) {
+              iter = newIter;
+            }
+          }
+        }
+        // Release buffers as we are done with all the streams... also see 
toRelease comment.
+        // With uncompressed streams, we know we are done earlier.
+        if (toRelease != null) {
+          releaseBuffers(toRelease.keySet(), true);
+          toRelease = null;
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Disk ranges after pre-read (file " + fileKey + ", base 
offset "
+              + stripeOffset + "): " + 
RecordReaderUtils.stringifyDiskRanges(toRead.next));
+        }
+        iter = toRead.next; // Reset the iter to start.
+        hasError = false;
+      } finally {
+        // At this point, everything in the list is going to have a refcount 
of one. Unless it
+        // failed between the allocation and the incref for a single item, we 
should be ok. 
+        if (hasError) {
+          releaseInitialRefcounts(toRead.next);
+          if (toRelease != null) {
+            releaseBuffers(toRelease.keySet(), true);
+            toRelease = null;
           }
         }
       }
-      // Release buffers as we are done with all the streams... also see 
toRelease comment.\
-      // With uncompressed streams, we know we are done earlier.
-      if (toRelease != null) {
-        releaseBuffers(toRelease.keySet(), true);
-        toRelease = null;
+    }
+
+    try {
+      // 4. Finally, decompress data, map per RG, and return to caller.
+      // We go by RG and not by column because that is how data is processed.
+      int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / 
rowIndexStride);
+      for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
+        if (rgs != null && !rgs[rgIx]) {
+          continue; // RG filtered.
+        }
+        boolean isLastRg = rgIx == rgCount - 1;
+        // Create the batch we will use to return data for this RG.
+        OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
+        boolean hasError = true;
+        try {
+          ecb.init(fileKey, stripeIx, rgIx, included.length);
+          for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
+            ColumnReadContext ctx = colCtxs[colIx];
+            if (ctx == null) continue; // This column is not included.
+            if (isTracingEnabled) {
+              LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, 
rgIx, isLastRg, rgCount);
+            }
+            OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
+                nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
+            ecb.initOrcColumn(ctx.colIx);
+            for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+              StreamContext sctx = ctx.streams[streamIx];
+              ColumnStreamData cb = null;
+              try {
+                if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
+                  // This stream is for entire stripe and needed for every RG; 
uncompress once and reuse.
+                  if (isTracingEnabled) {
+                    LOG.trace("Getting stripe-level stream [" + sctx.kind + ", 
" + ctx.encoding + "] for"
+                        + " column " + ctx.colIx + " RG " + rgIx + " at " + 
sctx.offset + ", " + sctx.length);
+                  }
+                  if (sctx.stripeLevelStream == null) {
+                    sctx.stripeLevelStream = POOLS.csdPool.take();
+                    // We will be using this for each RG while also sending 
RGs to processing.
+                    // To avoid buffers being unlocked, run refcount one 
ahead; so each RG 
+                    // processing will decref once, and the last one will 
unlock the buffers.
+                    sctx.stripeLevelStream.incRef();
+                    // For stripe-level streams we don't need the extra 
refcount on the block.
+                    // See class comment about refcounts.
+                    long unlockUntilCOffset = sctx.offset + sctx.length;
+                    DiskRangeList lastCached = readEncodedStream(stripeOffset, 
iter,
+                        sctx.offset, sctx.offset + sctx.length, 
sctx.stripeLevelStream,
+                        unlockUntilCOffset, sctx.offset, toRelease);
+                    if (lastCached != null) {
+                      iter = lastCached;
+                    }
+                  }
+                  sctx.stripeLevelStream.incRef();
+                  cb = sctx.stripeLevelStream;
+                } else {
+                  // This stream can be separated by RG using index. Let's do 
that.
+                  // Offset to where this RG begins.
+                  long cOffset = sctx.offset + 
index.getPositions(sctx.streamIndexOffset);
+                  // Offset relative to the beginning of the stream of where 
this RG ends.
+                  long nextCOffsetRel = isLastRg ? sctx.length
+                      : nextIndex.getPositions(sctx.streamIndexOffset);
+                  // Offset before which this RG is guaranteed to end. Can 
only be estimated.
+                  // We estimate the same way for compressed and uncompressed 
for now.
+                  long endCOffset = sctx.offset + 
RecordReaderUtils.estimateRgEndOffset(
+                      isCompressed, isLastRg, nextCOffsetRel, sctx.length, 
bufferSize);
+                  // As we read, we can unlock initial refcounts for the 
buffers that end before
+                  // the data that we need for this RG.
+                  long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
+                  cb = createRgColumnStreamData(
+                      rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, 
isCompressed);
+                  boolean isStartOfStream = sctx.bufferIter == null;
+                  DiskRangeList lastCached = readEncodedStream(stripeOffset,
+                      (isStartOfStream ? iter : sctx.bufferIter), cOffset, 
endCOffset, cb,
+                      unlockUntilCOffset, sctx.offset, toRelease);
+                  if (lastCached != null) {
+                    sctx.bufferIter = iter = lastCached;
+                  }
+                }
+                ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
+              } catch (Exception ex) {
+                DiskRangeList drl = toRead == null ? null : toRead.next;
+                LOG.error("Error getting stream [" + sctx.kind + ", " + 
ctx.encoding + "] for"
+                    + " column " + ctx.colIx + " RG " + rgIx + " at " + 
sctx.offset + ", "
+                    + sctx.length + "; toRead " + 
RecordReaderUtils.stringifyDiskRanges(drl), ex);
+                throw (ex instanceof IOException) ? (IOException)ex : new 
IOException(ex);
+              }
+            }
+          }
+          hasError = false;
+        } finally {
+          if (hasError) {
+            releaseEcbRefCountsOnError(ecb);
+          }
+        }
+        // After this, the non-initial refcounts are the responsibility of the 
consumer.
+        consumer.consumeData(ecb);
       }
+
       if (isTracingEnabled) {
-        LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base 
offset "
-            + stripeOffset + "): " + 
RecordReaderUtils.stringifyDiskRanges(toRead.next));
+        LOG.trace("Disk ranges after preparing all the data "
+            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
       }
-      iter = toRead.next; // Reset the iter to start.
-    }
-
-    // 4. Finally, decompress data, map per RG, and return to caller.
-    // We go by RG and not by column because that is how data is processed.
-    int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / 
rowIndexStride);
-    for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
-      boolean isLastRg = rgIx == rgCount - 1;
-      // Create the batch we will use to return data for this RG.
-      OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
-      ecb.init(fileKey, stripeIx, rgIx, included.length);
-      boolean isRGSelected = true;
+    } finally {
+      // Release the unreleased stripe-level buffers. See class comment about 
refcounts.
       for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
         ColumnReadContext ctx = colCtxs[colIx];
         if (ctx == null) continue; // This column is not included.
-        if (isTracingEnabled) {
-          LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, rgIx, 
isLastRg, rgCount);
-        }
-        // TODO: simplify this now that high-level cache has been removed. 
Same RGs for all cols.
-        if (colRgs[ctx.includedIx] != null && !colRgs[ctx.includedIx][rgIx]) {
-          // RG x col filtered.
-          isRGSelected = false;
-          if (isTracingEnabled) {
-            LOG.trace("colIxMod: {} rgIx: {} colRgs[{}]: {} colRgs[{}][{}]: 
{}", ctx.includedIx, rgIx, ctx.includedIx,
-              Arrays.toString(colRgs[ctx.includedIx]), ctx.includedIx, rgIx, 
colRgs[ctx.includedIx][rgIx]);
-          }
-           continue;
-        }
-        OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
-            nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
-        ecb.initOrcColumn(ctx.colIx);
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
-          ColumnStreamData cb = null;
-          try {
-            if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
-              // This stream is for entire stripe and needed for every RG; 
uncompress once and reuse.
-              if (isTracingEnabled) {
-                LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + 
ctx.encoding + "] for"
-                    + " column " + ctx.colIx + " RG " + rgIx + " at " + 
sctx.offset + ", " + sctx.length);
-              }
-              if (sctx.stripeLevelStream == null) {
-                sctx.stripeLevelStream = POOLS.csdPool.take();
-                // We will be using this for each RG while also sending RGs to 
processing.
-                // To avoid buffers being unlocked, run refcount one ahead; so 
each RG 
-                 // processing will decref once, and the
-                // last one will unlock the buffers.
-                sctx.stripeLevelStream.incRef();
-                // For stripe-level streams we don't need the extra refcount 
on the block.
-                // See class comment about refcounts.
-                long unlockUntilCOffset = sctx.offset + sctx.length;
-                DiskRangeList lastCached = readEncodedStream(stripeOffset, 
iter,
-                    sctx.offset, sctx.offset + sctx.length, 
sctx.stripeLevelStream,
-                    unlockUntilCOffset, sctx.offset, toRelease);
-                if (lastCached != null) {
-                  iter = lastCached;
-                }
-              }
-              sctx.stripeLevelStream.incRef();
-              cb = sctx.stripeLevelStream;
-            } else {
-              // This stream can be separated by RG using index. Let's do that.
-              // Offset to where this RG begins.
-              long cOffset = sctx.offset + 
index.getPositions(sctx.streamIndexOffset);
-              // Offset relative to the beginning of the stream of where this 
RG ends.
-              long nextCOffsetRel = isLastRg ? sctx.length
-                  : nextIndex.getPositions(sctx.streamIndexOffset);
-              // Offset before which this RG is guaranteed to end. Can only be 
estimated.
-              // We estimate the same way for compressed and uncompressed for 
now.
-              long endCOffset = sctx.offset + 
RecordReaderUtils.estimateRgEndOffset(
-                  isCompressed, isLastRg, nextCOffsetRel, sctx.length, 
bufferSize);
-              // As we read, we can unlock initial refcounts for the buffers 
that end before
-              // the data that we need for this RG.
-              long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
-              cb = createRgColumnStreamData(
-                  rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, 
isCompressed);
-              boolean isStartOfStream = sctx.bufferIter == null;
-              DiskRangeList lastCached = readEncodedStream(stripeOffset,
-                  (isStartOfStream ? iter : sctx.bufferIter), cOffset, 
endCOffset, cb,
-                  unlockUntilCOffset, sctx.offset, toRelease);
-              if (lastCached != null) {
-                sctx.bufferIter = iter = lastCached;
-              }
+          if (sctx == null || sctx.stripeLevelStream == null) continue;
+          if (0 != sctx.stripeLevelStream.decRef()) continue;
+          // Note - this is a little bit confusing; the special treatment of 
stripe-level buffers
+          // is because we run the ColumnStreamData refcount one ahead (as 
specified above). It
+          // may look like this would release the buffers too many times (one 
release from the
+          // consumer, one from releaseInitialRefcounts below, and one here); 
however, this is
+          // merely handling a special case where all the batches that are 
sharing the stripe-
+          // level stream have been processed before we got here; they have 
all decRef-ed the CSD,
+          // but have not released the buffers because of that extra refCount. 
So, this is
+          // essentially the "consumer" refcount being released here.
+          for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Unlocking {} at the end of processing", buf);
             }
-            ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
-          } catch (Exception ex) {
-            DiskRangeList drl = toRead == null ? null : toRead.next;
-            LOG.error("Error getting stream [" + sctx.kind + ", " + 
ctx.encoding + "] for"
-                + " column " + ctx.colIx + " RG " + rgIx + " at " + 
sctx.offset + ", "
-                + sctx.length + "; toRead " + 
RecordReaderUtils.stringifyDiskRanges(drl), ex);
-            throw (ex instanceof IOException) ? (IOException)ex : new 
IOException(ex);
+            cacheWrapper.releaseBuffer(buf);
           }
         }
       }
-      if (isRGSelected) {
-        consumer.consumeData(ecb);
+
+      releaseInitialRefcounts(toRead.next);
+      // Release buffers as we are done with all the streams... also see 
toRelease comment.
+      if (toRelease != null) {
+        releaseBuffers(toRelease.keySet(), true);
       }
     }
+    releaseCacheChunksIntoObjectPool(toRead.next);
+  }
 
-    if (isTracingEnabled) {
-      LOG.trace("Disk ranges after preparing all the data "
-          + RecordReaderUtils.stringifyDiskRanges(toRead.next));
-    }
 
-    // Release the unreleased buffers. See class comment about refcounts.
-    for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
-      ColumnReadContext ctx = colCtxs[colIx];
-      if (ctx == null) continue; // This column is not included.
-      for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
-        StreamContext sctx = ctx.streams[streamIx];
-        if (sctx == null || sctx.stripeLevelStream == null) continue;
-        if (0 != sctx.stripeLevelStream.decRef()) continue;
-        for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Unlocking {} at the end of processing", buf);
-          }
+  private void releaseEcbRefCountsOnError(OrcEncodedColumnBatch ecb) {
+    if (isTracingEnabled) {
+      LOG.trace("Unlocking the batch not sent to consumer, on error");
+    }
+    // We cannot send the ecb to consumer. Discard whatever is already there.
+    for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+      if (!ecb.hasData(colIx)) continue;
+      ColumnStreamData[] datas = ecb.getColumnData(colIx);
+      for (ColumnStreamData data : datas) {
+        if (data == null || data.decRef() != 0) continue;
+        for (MemoryBuffer buf : data.getCacheBuffers()) {
+          if (buf == null) continue;
           cacheWrapper.releaseBuffer(buf);
         }
       }
     }
-
-    releaseInitialRefcounts(toRead.next);
-    // Release buffers as we are done with all the streams... also see 
toRelease comment.
-    if (toRelease != null) {
-      releaseBuffers(toRelease.keySet(), true);
-    }
-    releaseCacheChunksIntoObjectPool(toRead.next);
   }
 
 

Reply via email to