HIVE-19838 : simplify & fix ColumnizedDeleteEventRegistry load loop (Sergey Shelukhin, reviewed by Teddy Choi)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88d0da45 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88d0da45 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88d0da45 Branch: refs/heads/branch-3 Commit: 88d0da45b30c5e1f19af45a33a7c891175f7c38e Parents: 55bc285 Author: sergey <ser...@apache.org> Authored: Thu Jun 14 12:15:28 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Thu Jun 14 12:15:52 2018 -0700 ---------------------------------------------------------------------- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 159 ++++++++++--------- 1 file changed, 83 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/88d0da45/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index a4568de..66ffcae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -859,9 +861,11 @@ public class VectorizedOrcAcidRowBatchReader private final ValidWriteIdList validWriteIdList; private boolean isBucketPropertyRepeating; private final boolean isBucketedTable; + private final Reader reader; DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, ValidWriteIdList validWriteIdList, boolean isBucketedTable) throws IOException { + this.reader = deleteDeltaReader; this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); this.bucketForSplit = bucket; this.batch = deleteDeltaReader.getSchema().createRowBatch(); @@ -955,6 +959,12 @@ public class VectorizedOrcAcidRowBatchReader " from " + dummy + " curTxnId: " + curTxnId); } } + + @Override + public String toString() { + return "{reader=" + reader + ", isBucketPropertyRepeating=" + isBucketPropertyRepeating + + ", bucketForSplit=" + bucketForSplit + ", isBucketedTable=" + isBucketedTable + "}"; + } } /** * A CompressedOwid class stores a compressed representation of the original @@ -968,7 +978,7 @@ public class VectorizedOrcAcidRowBatchReader final long originalWriteId; final int bucketProperty; final int fromIndex; // inclusive - final int toIndex; // exclusive + int toIndex; // exclusive CompressedOwid(long owid, int bucketProperty, int fromIndex, int toIndex) { this.originalWriteId = owid; @@ -1027,49 +1037,50 @@ public class VectorizedOrcAcidRowBatchReader int totalDeleteEventCount = 0; for (Path deleteDeltaDir : deleteDeltaDirs) { FileSystem fs = deleteDeltaDir.getFileSystem(conf); - for(Path deleteDeltaFile : OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf, - new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable)) { - // NOTE: Calling last flush length below is more for future-proofing when we have - // streaming deletes. But currently we don't support streaming deletes, and this can - // be removed if this becomes a performance issue. - long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile); - // NOTE: A check for existence of deleteDeltaFile is required because we may not have - // deletes for the bucket being taken into consideration for this split processing. - if (length != -1 && fs.exists(deleteDeltaFile)) { - Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, - OrcFile.readerOptions(conf).maxLength(length)); - AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); - if (acidStats.deletes == 0) { - continue; // just a safe check to ensure that we are not reading empty delete files. - } - totalDeleteEventCount += acidStats.deletes; - if (totalDeleteEventCount > maxEventsInMemory) { - // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas - // into memory. To prevent out-of-memory errors, this check is a rough heuristic that - // prevents creation of an object of this class if the total number of delete events - // exceed this value. By default, it has been set to 10 million delete events per bucket. - LOG.info("Total number of delete events exceeds the maximum number of delete events " - + "that can be loaded into memory for the delete deltas in the directory at : " - + deleteDeltaDirs.toString() +". The max limit is currently set at " - + maxEventsInMemory + " and can be changed by setting the Hive config variable " - + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); - throw new DeleteEventsOverflowMemoryException(); - } - DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, - readerOptions, bucket, validWriteIdList, isBucketedTable); - DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); - if (deleteReaderValue.next(deleteRecordKey)) { - sortMerger.put(deleteRecordKey, deleteReaderValue); - } else { - deleteReaderValue.close(); + Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, + conf, new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable); + for (Path deleteDeltaFile : deleteDeltaFiles) { + // NOTE: Calling last flush length below is more for future-proofing when we have + // streaming deletes. But currently we don't support streaming deletes, and this can + // be removed if this becomes a performance issue. + long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile); + // NOTE: A check for existence of deleteDeltaFile is required because we may not have + // deletes for the bucket being taken into consideration for this split processing. + if (length != -1 && fs.exists(deleteDeltaFile)) { + Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, + OrcFile.readerOptions(conf).maxLength(length)); + AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); + if (acidStats.deletes == 0) { + continue; // just a safe check to ensure that we are not reading empty delete files. + } + totalDeleteEventCount += acidStats.deletes; + if (totalDeleteEventCount > maxEventsInMemory) { + // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas + // into memory. To prevent out-of-memory errors, this check is a rough heuristic that + // prevents creation of an object of this class if the total number of delete events + // exceed this value. By default, it has been set to 10 million delete events per bucket. + LOG.info("Total number of delete events exceeds the maximum number of delete events " + + "that can be loaded into memory for the delete deltas in the directory at : " + + deleteDeltaDirs.toString() +". The max limit is currently set at " + + maxEventsInMemory + " and can be changed by setting the Hive config variable " + + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); + throw new DeleteEventsOverflowMemoryException(); + } + DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, + readerOptions, bucket, validWriteIdList, isBucketedTable); + DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); + if (deleteReaderValue.next(deleteRecordKey)) { + sortMerger.put(deleteRecordKey, deleteReaderValue); + } else { + deleteReaderValue.close(); + } } } } - } + // Note: totalDeleteEventCount can actually be higher than real value. + // We assume here it won't be lower. Maybe we should just read and not guess... if (totalDeleteEventCount > 0) { - // Initialize the rowId array when we have some delete events. - rowIds = new long[totalDeleteEventCount]; - readAllDeleteEventsFromDeleteDeltas(); + readAllDeleteEventsFromDeleteDeltas(totalDeleteEventCount); } } isEmpty = compressedOwids == null || rowIds == null; @@ -1087,15 +1098,21 @@ public class VectorizedOrcAcidRowBatchReader * In practice we should be filtering delete evens by min/max ROW_ID from the split. The later * is also not yet implemented: HIVE-16812. */ - private void readAllDeleteEventsFromDeleteDeltas() throws IOException { - if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read. - int distinctOwids = 0; - long lastSeenOwid = -1; - int lastSeenBucketProperty = -1; - long owids[] = new long[rowIds.length]; - int[] bucketProperties = new int [rowIds.length]; - + private void readAllDeleteEventsFromDeleteDeltas( + int totalDeleteEventCount) throws IOException { + if (sortMerger == null || sortMerger.isEmpty()) { + rowIds = new long[0]; + return; // trivial case, nothing to read. + } + + // Initialize the rowId array when we have some delete events. + rowIds = new long[totalDeleteEventCount]; + int index = 0; + // We compress the owids into CompressedOwid data structure that records + // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid. + List<CompressedOwid> compressedOwids = new ArrayList<>(); + CompressedOwid lastCo = null; while (!sortMerger.isEmpty()) { // The sortMerger is a heap data structure that stores a pair of // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey. @@ -1109,45 +1126,35 @@ public class VectorizedOrcAcidRowBatchReader Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry(); DeleteRecordKey deleteRecordKey = entry.getKey(); DeleteReaderValue deleteReaderValue = entry.getValue(); - owids[index] = deleteRecordKey.originalWriteId; - bucketProperties[index] = deleteRecordKey.bucketProperty; + long owid = deleteRecordKey.originalWriteId; + int bp = deleteRecordKey.bucketProperty; rowIds[index] = deleteRecordKey.rowId; - ++index; - if (lastSeenOwid != deleteRecordKey.originalWriteId || - lastSeenBucketProperty != deleteRecordKey.bucketProperty) { - ++distinctOwids; - lastSeenOwid = deleteRecordKey.originalWriteId; - lastSeenBucketProperty = deleteRecordKey.bucketProperty; + if (lastCo == null || lastCo.originalWriteId != owid || lastCo.bucketProperty != bp) { + if (lastCo != null) { + lastCo.toIndex = index; // Finalize the previous record. + } + lastCo = new CompressedOwid(owid, bp, index, -1); + compressedOwids.add(lastCo); } + ++index; if (deleteReaderValue.next(deleteRecordKey)) { sortMerger.put(deleteRecordKey, deleteReaderValue); } else { deleteReaderValue.close(); // Exhausted reading all records, close the reader. } } - - // Once we have processed all the delete events and seen all the distinct owids, - // we compress the owids into CompressedOwid data structure that records - // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid. - this.compressedOwids = new CompressedOwid[distinctOwids]; - lastSeenOwid = owids[0]; - lastSeenBucketProperty = bucketProperties[0]; - int fromIndex = 0, pos = 0; - for (int i = 1; i < owids.length; ++i) { - if (owids[i] != lastSeenOwid || lastSeenBucketProperty != bucketProperties[i]) { - compressedOwids[pos] = - new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, i); - lastSeenOwid = owids[i]; - lastSeenBucketProperty = bucketProperties[i]; - fromIndex = i; - ++pos; - } + if (lastCo != null) { + lastCo.toIndex = index; // Finalize the last record. + lastCo = null; + } + if (rowIds.length > index) { + rowIds = Arrays.copyOf(rowIds, index); } - // account for the last distinct owid - compressedOwids[pos] = - new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, owids.length); + + this.compressedOwids = compressedOwids.toArray(new CompressedOwid[compressedOwids.size()]); } + private boolean isDeleted(long owid, int bucketProperty, long rowId) { if (compressedOwids == null || rowIds == null) { return false;