Repository: hive
Updated Branches:
  refs/heads/branch-3 55bc28540 -> 88d0da45b
  refs/heads/master f20311b00 -> 2915acc45


HIVE-19838 : simplify & fix ColumnizedDeleteEventRegistry load loop (Sergey 
Shelukhin, reviewed by Teddy Choi)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2915acc4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2915acc4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2915acc4

Branch: refs/heads/master
Commit: 2915acc4564fe428ec2bddb2b855d2ecab33a624
Parents: f20311b
Author: sergey <ser...@apache.org>
Authored: Thu Jun 14 12:15:28 2018 -0700
Committer: sergey <ser...@apache.org>
Committed: Thu Jun 14 12:15:28 2018 -0700

----------------------------------------------------------------------
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 159 ++++++++++---------
 1 file changed, 83 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2915acc4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index a4568de..66ffcae 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
@@ -859,9 +861,11 @@ public class VectorizedOrcAcidRowBatchReader
       private final ValidWriteIdList validWriteIdList;
       private boolean isBucketPropertyRepeating;
       private final boolean isBucketedTable;
+      private final Reader reader;
 
       DeleteReaderValue(Reader deleteDeltaReader, Reader.Options 
readerOptions, int bucket,
           ValidWriteIdList validWriteIdList, boolean isBucketedTable) throws 
IOException {
+        this.reader = deleteDeltaReader;
         this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
         this.bucketForSplit = bucket;
         this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -955,6 +959,12 @@ public class VectorizedOrcAcidRowBatchReader
             " from " + dummy + " curTxnId: " + curTxnId);
         }
       }
+
+      @Override
+      public String toString() {
+        return "{reader=" + reader + ", isBucketPropertyRepeating=" + 
isBucketPropertyRepeating +
+            ", bucketForSplit=" + bucketForSplit + ", isBucketedTable=" + 
isBucketedTable + "}";
+      }
     }
     /**
      * A CompressedOwid class stores a compressed representation of the 
original
@@ -968,7 +978,7 @@ public class VectorizedOrcAcidRowBatchReader
       final long originalWriteId;
       final int bucketProperty;
       final int fromIndex; // inclusive
-      final int toIndex; // exclusive
+      int toIndex; // exclusive
 
       CompressedOwid(long owid, int bucketProperty, int fromIndex, int 
toIndex) {
         this.originalWriteId = owid;
@@ -1027,49 +1037,50 @@ public class VectorizedOrcAcidRowBatchReader
           int totalDeleteEventCount = 0;
           for (Path deleteDeltaDir : deleteDeltaDirs) {
             FileSystem fs = deleteDeltaDir.getFileSystem(conf);
-            for(Path deleteDeltaFile : 
OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf,
-              new OrcRawRecordMerger.Options().isCompacting(false), 
isBucketedTable)) {
-            // NOTE: Calling last flush length below is more for 
future-proofing when we have
-            // streaming deletes. But currently we don't support streaming 
deletes, and this can
-            // be removed if this becomes a performance issue.
-            long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
-            // NOTE: A check for existence of deleteDeltaFile is required 
because we may not have
-            // deletes for the bucket being taken into consideration for this 
split processing.
-            if (length != -1 && fs.exists(deleteDeltaFile)) {
-              Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
-                  OrcFile.readerOptions(conf).maxLength(length));
-              AcidStats acidStats = 
OrcAcidUtils.parseAcidStats(deleteDeltaReader);
-              if (acidStats.deletes == 0) {
-                continue; // just a safe check to ensure that we are not 
reading empty delete files.
-              }
-              totalDeleteEventCount += acidStats.deletes;
-              if (totalDeleteEventCount > maxEventsInMemory) {
-                // ColumnizedDeleteEventRegistry loads all the delete events 
from all the delete deltas
-                // into memory. To prevent out-of-memory errors, this check is 
a rough heuristic that
-                // prevents creation of an object of this class if the total 
number of delete events
-                // exceed this value. By default, it has been set to 10 
million delete events per bucket.
-                LOG.info("Total number of delete events exceeds the maximum 
number of delete events "
-                    + "that can be loaded into memory for the delete deltas in 
the directory at : "
-                    + deleteDeltaDirs.toString() +". The max limit is 
currently set at "
-                    + maxEventsInMemory + " and can be changed by setting the 
Hive config variable "
-                    + 
ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
-                throw new DeleteEventsOverflowMemoryException();
-              }
-              DeleteReaderValue deleteReaderValue = new 
DeleteReaderValue(deleteDeltaReader,
-                  readerOptions, bucket, validWriteIdList, isBucketedTable);
-              DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
-              if (deleteReaderValue.next(deleteRecordKey)) {
-                sortMerger.put(deleteRecordKey, deleteReaderValue);
-              } else {
-                deleteReaderValue.close();
+            Path[] deleteDeltaFiles = 
OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
+                conf, new OrcRawRecordMerger.Options().isCompacting(false), 
isBucketedTable);
+            for (Path deleteDeltaFile : deleteDeltaFiles) {
+              // NOTE: Calling last flush length below is more for 
future-proofing when we have
+              // streaming deletes. But currently we don't support streaming 
deletes, and this can
+              // be removed if this becomes a performance issue.
+              long length = OrcAcidUtils.getLastFlushLength(fs, 
deleteDeltaFile);
+              // NOTE: A check for existence of deleteDeltaFile is required 
because we may not have
+              // deletes for the bucket being taken into consideration for 
this split processing.
+              if (length != -1 && fs.exists(deleteDeltaFile)) {
+                Reader deleteDeltaReader = 
OrcFile.createReader(deleteDeltaFile,
+                    OrcFile.readerOptions(conf).maxLength(length));
+                AcidStats acidStats = 
OrcAcidUtils.parseAcidStats(deleteDeltaReader);
+                if (acidStats.deletes == 0) {
+                  continue; // just a safe check to ensure that we are not 
reading empty delete files.
+                }
+                totalDeleteEventCount += acidStats.deletes;
+                if (totalDeleteEventCount > maxEventsInMemory) {
+                  // ColumnizedDeleteEventRegistry loads all the delete events 
from all the delete deltas
+                  // into memory. To prevent out-of-memory errors, this check 
is a rough heuristic that
+                  // prevents creation of an object of this class if the total 
number of delete events
+                  // exceed this value. By default, it has been set to 10 
million delete events per bucket.
+                  LOG.info("Total number of delete events exceeds the maximum 
number of delete events "
+                      + "that can be loaded into memory for the delete deltas 
in the directory at : "
+                      + deleteDeltaDirs.toString() +". The max limit is 
currently set at "
+                      + maxEventsInMemory + " and can be changed by setting 
the Hive config variable "
+                      + 
ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
+                  throw new DeleteEventsOverflowMemoryException();
+                }
+                DeleteReaderValue deleteReaderValue = new 
DeleteReaderValue(deleteDeltaReader,
+                    readerOptions, bucket, validWriteIdList, isBucketedTable);
+                DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+                if (deleteReaderValue.next(deleteRecordKey)) {
+                  sortMerger.put(deleteRecordKey, deleteReaderValue);
+                } else {
+                  deleteReaderValue.close();
+                }
               }
             }
           }
-          }
+          // Note: totalDeleteEventCount can actually be higher than real 
value.
+          //       We assume here it won't be lower. Maybe we should just read 
and not guess...
           if (totalDeleteEventCount > 0) {
-            // Initialize the rowId array when we have some delete events.
-            rowIds = new long[totalDeleteEventCount];
-            readAllDeleteEventsFromDeleteDeltas();
+            readAllDeleteEventsFromDeleteDeltas(totalDeleteEventCount);
           }
         }
         isEmpty = compressedOwids == null || rowIds == null;
@@ -1087,15 +1098,21 @@ public class VectorizedOrcAcidRowBatchReader
      * In practice we should be filtering delete evens by min/max ROW_ID from 
the split.  The later
      * is also not yet implemented: HIVE-16812.
      */
-    private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
-      if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, 
nothing to read.
-      int distinctOwids = 0;
-      long lastSeenOwid = -1;
-      int lastSeenBucketProperty = -1;
-      long owids[] = new long[rowIds.length];
-      int[] bucketProperties = new int [rowIds.length];
-      
+    private void readAllDeleteEventsFromDeleteDeltas(
+        int totalDeleteEventCount) throws IOException {
+      if (sortMerger == null || sortMerger.isEmpty()) {
+        rowIds = new long[0];
+        return; // trivial case, nothing to read.
+      }
+
+      // Initialize the rowId array when we have some delete events.
+      rowIds = new long[totalDeleteEventCount];
+
       int index = 0;
+      // We compress the owids into CompressedOwid data structure that records
+      // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid.
+      List<CompressedOwid> compressedOwids = new ArrayList<>();
+      CompressedOwid lastCo = null;
       while (!sortMerger.isEmpty()) {
         // The sortMerger is a heap data structure that stores a pair of
         // (deleteRecordKey, deleteReaderValue) at each node and is ordered by 
deleteRecordKey.
@@ -1109,45 +1126,35 @@ public class VectorizedOrcAcidRowBatchReader
         Entry<DeleteRecordKey, DeleteReaderValue> entry = 
sortMerger.pollFirstEntry();
         DeleteRecordKey deleteRecordKey = entry.getKey();
         DeleteReaderValue deleteReaderValue = entry.getValue();
-        owids[index] = deleteRecordKey.originalWriteId;
-        bucketProperties[index] = deleteRecordKey.bucketProperty;
+        long owid = deleteRecordKey.originalWriteId;
+        int bp = deleteRecordKey.bucketProperty;
         rowIds[index] = deleteRecordKey.rowId;
-        ++index;
-        if (lastSeenOwid != deleteRecordKey.originalWriteId ||
-          lastSeenBucketProperty != deleteRecordKey.bucketProperty) {
-          ++distinctOwids;
-          lastSeenOwid = deleteRecordKey.originalWriteId;
-          lastSeenBucketProperty = deleteRecordKey.bucketProperty;
+        if (lastCo == null || lastCo.originalWriteId != owid || 
lastCo.bucketProperty != bp) {
+          if (lastCo != null) {
+            lastCo.toIndex = index; // Finalize the previous record.
+          }
+          lastCo = new CompressedOwid(owid, bp, index, -1);
+          compressedOwids.add(lastCo);
         }
+        ++index;
         if (deleteReaderValue.next(deleteRecordKey)) {
           sortMerger.put(deleteRecordKey, deleteReaderValue);
         } else {
           deleteReaderValue.close(); // Exhausted reading all records, close 
the reader.
         }
       }
-
-      // Once we have processed all the delete events and seen all the 
distinct owids,
-      // we compress the owids into CompressedOwid data structure that records
-      // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid.
-      this.compressedOwids = new CompressedOwid[distinctOwids];
-      lastSeenOwid = owids[0];
-      lastSeenBucketProperty = bucketProperties[0];
-      int fromIndex = 0, pos = 0;
-      for (int i = 1; i < owids.length; ++i) {
-        if (owids[i] != lastSeenOwid || lastSeenBucketProperty != 
bucketProperties[i]) {
-          compressedOwids[pos] =
-            new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, 
fromIndex, i);
-          lastSeenOwid = owids[i];
-          lastSeenBucketProperty = bucketProperties[i];
-          fromIndex = i;
-          ++pos;
-        }
+      if (lastCo != null) {
+        lastCo.toIndex = index; // Finalize the last record.
+        lastCo = null;
+      }
+      if (rowIds.length > index) {
+        rowIds = Arrays.copyOf(rowIds, index);
       }
-      // account for the last distinct owid
-      compressedOwids[pos] =
-        new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, 
owids.length);
+
+      this.compressedOwids = compressedOwids.toArray(new 
CompressedOwid[compressedOwids.size()]);
     }
 
+
     private boolean isDeleted(long owid, int bucketProperty, long rowId) {
       if (compressedOwids == null || rowIds == null) {
         return false;

Reply via email to