Deletable frame tuple appender with reuse of tuple index slots
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fd514a0e Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fd514a0e Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fd514a0e Branch: refs/heads/ecarm002/interval_join_merge Commit: fd514a0ed598219faf0ecdfa19e3c894f3b706f1 Parents: 6c31214 Author: Preston Carman <prest...@apache.org> Authored: Mon Sep 19 17:59:17 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Mon Sep 19 17:59:17 2016 -0700 ---------------------------------------------------------------------- .../common/comm/io/FrameTupleAccessor.java | 5 +- .../sort/util/DeletableFrameTupleAppender.java | 259 ++++++++++++++----- 2 files changed, 204 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd514a0e/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java index d99e2f2..1c702be 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java @@ -35,7 +35,8 @@ import org.apache.hyracks.util.IntSerDeUtils; /** * FrameTupleCursor is used to navigate over tuples in a Frame. A frame is * formatted with tuple data concatenated starting at offset 1, one tuple after - * another. The first byte is used to notify how big the frame is, so the maximum frame size is 255 * initialFrameSetting. + * another. The first byte is used to notify how big the frame is, + * so the maximum frame size is 255 * initialFrameSetting. * Offset FS - 4 holds an int indicating the number of tuples (N) in * the frame. FS - ((i + 1) * 4) for i from 0 to N - 1 holds an int indicating * the offset of the (i + 1)^th tuple. Every tuple is organized as a sequence of @@ -184,6 +185,7 @@ public class FrameTupleAccessor implements IFrameTupleAccessor { prettyPrint(i, bbis, dis, sb, recordFields); } System.err.println(sb.toString()); + bbis.close(); } public void prettyPrint(int tIdx, int[] recordFields) throws IOException { @@ -192,6 +194,7 @@ public class FrameTupleAccessor implements IFrameTupleAccessor { StringBuilder sb = new StringBuilder(); prettyPrint(tIdx, bbis, dis, sb, recordFields); System.err.println(sb.toString()); + bbis.close(); } public void prettyPrint(ITupleReference tuple, int fieldsIdx, int descIdx) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd514a0e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java index 79aba3e..8cae721 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java @@ -21,8 +21,11 @@ package org.apache.hyracks.dataflow.std.sort.util; import java.io.PrintStream; import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.PriorityQueue; -import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -32,59 +35,153 @@ import org.apache.hyracks.util.IntSerDeUtils; * This is a special frame which is used in TupleMemoryBuffer. * This frame has a special structure to organize the deleted spaces. * Specifically, the endOffset of the deleted tuple will be set as negative number. - * And we add a special <code>deleted_space</code> field at the last 4 bytes to remember how many bytes has been deleted. + * And we add a special <code>deleted_space</code> field at the last 4 bytes to + * remember how many bytes has been deleted. + * The offsets also store both the start and end values because tuples may be out of + * after several add, remove and reorganize operations. + * A frame is formatted with tuple data concatenated starting at offset 0, + * one tuple after another. + * Offset FS - 4 holds an int indicating the amount of <code>deleted_space</code> in the frame. + * Offset FS - 4 holds an int indicating the number of tuples (N) in the frame. + * FS - ((i + 1) * (4 + 4)) for i from 0 to N - 1 holds an two ints indicating + * the offset and length of the (i + 1)^th tuple. + * The tuple references are organized as a sequence of ints indicating the start of the field + * followed by the length of each tuple. + * The offset if the start of the frame. + * The tuple has been deleted if the length is 0. + * + * <pre> + * [ *tuple_1_bytes*, + * *tuple_2_bytes*, + * ... + * int length, int offset, # tuple 2 + * int length, int offset, # tuple 1 + * int tuple_append, + * int next_index, + * int deleted_space, + * int index_count, + * ] + * </pre> + * + * <pre> + * [ *tuple_1_bytes*, + * *tuple_2_bytes*, + * ... + * int end_offset, + * int offset, # tuple 2 + * int offset, # tuple 1 + * int deleted_space, + * int tuple_count, + * ] + * </pre> */ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAccessor { + public static final Comparator<Pair<Integer, Integer>> INDEX_OFFSET_ASC_COMPARATOR = new Comparator<Pair<Integer, Integer>>() { + @Override + public int compare(Pair<Integer, Integer> p1, Pair<Integer, Integer> p2) { + return p1.getValue() - p2.getValue(); + } + + }; + + private static final int SIZE_INDEX_COUNT = 4; private static final int SIZE_DELETED_SPACE = 4; + private static final int SIZE_NEXT_INDEX = 4; + private static final int SIZE_TUPLE_APPEND = 4; + + private static final int SIZE_START_OFFSET = 4; + private static final int SIZE_END_OFFSET = 4; + private static final int SIZE_OFFSET_GROUP = SIZE_END_OFFSET + SIZE_START_OFFSET; + private final RecordDescriptor recordDescriptor; private ByteBuffer buffer; - private int tupleCountOffset; - private int tupleCount; - private int freeDataEndOffset; + private int indexSlotsOffset; + private int indexCount; + private int tupleAppend; private int deletedSpace; + private int nextIndex; private byte[] array; // to speed up the array visit a little + private final PriorityQueue<Pair<Integer, Integer>> reorganizeQueue; + public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) { this.recordDescriptor = recordDescriptor; + reorganizeQueue = new PriorityQueue<>(16, INDEX_OFFSET_ASC_COMPARATOR); } - private int getTupleCountOffset() { - return FrameHelper.getTupleCountOffset(buffer.capacity()) - SIZE_DELETED_SPACE; + private int getIndexCount() { + return IntSerDeUtils.getInt(array, getIndexCountOffset()); } - private int getFreeDataEndOffset() { - return tupleCount == 0 ? 0 : Math.abs(IntSerDeUtils.getInt(array, tupleCountOffset - tupleCount * 4)); + private void setIndexCount(int count) { + IntSerDeUtils.putInt(array, getIndexCountOffset(), count); } - private void setFreeDataEndOffset(int offset) { - assert (offset >= 0); - IntSerDeUtils.putInt(array, tupleCountOffset - tupleCount * 4, offset); + private int getIndexCountOffset() { + return buffer.capacity() - SIZE_INDEX_COUNT; } - private void setTupleCount(int count) { - IntSerDeUtils.putInt(array, tupleCountOffset, count); + private int getDeletedSpace() { + return IntSerDeUtils.getInt(array, getDeletedSpaceOffset()); } - private void setDeleteSpace(int count) { - IntSerDeUtils.putInt(array, buffer.capacity() - SIZE_DELETED_SPACE, count); + private void setDeletedSpace(int space) { + IntSerDeUtils.putInt(array, getDeletedSpaceOffset(), space); } - private int getPhysicalTupleCount() { - return IntSerDeUtils.getInt(array, tupleCountOffset); + private int getDeletedSpaceOffset() { + return getIndexCountOffset() - SIZE_DELETED_SPACE; } - private int getDeletedSpace() { - return IntSerDeUtils.getInt(array, buffer.capacity() - SIZE_DELETED_SPACE); + private int getNextIndex() { + return IntSerDeUtils.getInt(array, getNextIndexOffset()); + } + + private void setNextIndex(int index) { + IntSerDeUtils.putInt(array, getNextIndexOffset(), index); + } + + private int getNextIndexOffset() { + return getDeletedSpaceOffset() - SIZE_NEXT_INDEX; + } + + private int getAndUpdateNextIndex() { + int index = nextIndex; + nextIndex = index + 1; + while (nextIndex < indexCount) { + if (getTupleEndOffset(nextIndex) <= 0) { + break; + } + nextIndex++; + } + setNextIndex(nextIndex); + return index; + } + + private int getTupleAppend() { + return IntSerDeUtils.getInt(array, getTupleAppendOffset()); + } + + private void setTupleAppend(int offset) { + IntSerDeUtils.putInt(array, getTupleAppendOffset(), offset); + } + + private int getTupleAppendOffset() { + return getNextIndexOffset() - SIZE_TUPLE_APPEND; + } + + private int getIndexSlotOffset() { + return getTupleAppendOffset(); } @Override public void clear(ByteBuffer buffer) throws HyracksDataException { this.buffer = buffer; this.array = buffer.array(); - tupleCountOffset = getTupleCountOffset(); - setTupleCount(0); - setDeleteSpace(0); + setIndexCount(0); + setDeletedSpace(0); + setTupleAppend(0); resetCounts(); } @@ -92,14 +189,15 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc public void reset(ByteBuffer buffer) { this.buffer = buffer; this.array = buffer.array(); - tupleCountOffset = getTupleCountOffset(); resetCounts(); } private void resetCounts() { + indexSlotsOffset = getIndexSlotOffset(); deletedSpace = getDeletedSpace(); - tupleCount = getPhysicalTupleCount(); - freeDataEndOffset = getFreeDataEndOffset(); + indexCount = getIndexCount(); + tupleAppend = getTupleAppend(); + nextIndex = getNextIndex(); } /** @@ -116,11 +214,18 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc byte[] src = tupleAccessor.getBuffer().array(); int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex); int length = tupleAccessor.getTupleLength(tIndex); - System.arraycopy(src, tStartOffset, array, freeDataEndOffset, length); - setTupleCount(++tupleCount); - freeDataEndOffset += length; - setFreeDataEndOffset(freeDataEndOffset); - return tupleCount - 1; + System.arraycopy(src, tStartOffset, array, tupleAppend, length); + int index = getAndUpdateNextIndex(); + if (index < indexCount) { + // Don't change index count + } else { + // Increment count + setIndexCount(++indexCount); + } + setTupleOffsets(index, tupleAppend, length); + tupleAppend += length; + setTupleAppend(tupleAppend); + return index; } @Override @@ -129,7 +234,11 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc if (endOffset > 0) { setTupleEndOffset(tupleIndex, -endOffset); deletedSpace += endOffset - getTupleStartOffset(tupleIndex); - setDeleteSpace(deletedSpace); + setDeletedSpace(deletedSpace); + if (nextIndex > tupleIndex) { + nextIndex = tupleIndex; + setNextIndex(nextIndex); + } } } @@ -140,36 +249,53 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc } reclaimDeletedEnding(); - freeDataEndOffset = 0; - int endOffset = 0; - for (int i = 0; i < tupleCount; i++) { - int startOffset = Math.abs(endOffset); + // Build reorganize queue + int endOffset; + int startOffset; + for (int i = 0; i < indexCount; i++) { endOffset = getTupleEndOffset(i); + if (endOffset > 0) { + reorganizeQueue.add(new ImmutablePair<Integer, Integer>(i, getTupleStartOffset(i))); + } + } + + int index; + tupleAppend = 0; + while (!reorganizeQueue.isEmpty()) { + index = reorganizeQueue.remove().getKey(); + startOffset = getTupleStartOffset(index); + endOffset = getTupleEndOffset(index); if (endOffset >= 0) { int length = endOffset - startOffset; assert length >= 0; - if (freeDataEndOffset != startOffset) { - System.arraycopy(array, startOffset, array, freeDataEndOffset, length); + if (tupleAppend != startOffset) { + System.arraycopy(array, startOffset, array, tupleAppend, length); } - freeDataEndOffset += length; + setTupleOffsets(index, tupleAppend, length); + tupleAppend += length; } - setTupleEndOffset(i, freeDataEndOffset); } - setFreeDataEndOffset(freeDataEndOffset); + setTupleAppend(tupleAppend); deletedSpace = 0; - setDeleteSpace(0); + setDeletedSpace(0); + + // Clean up + reorganizeQueue.clear(); } private void reclaimDeletedEnding() { - for (int i = tupleCount - 1; i >= 0; i--) { + for (int i = indexCount - 1; i >= 0; i--) { int endOffset = getTupleEndOffset(i); if (endOffset <= 0) { - tupleCount--; + indexCount--; } else { break; } } - setTupleCount(tupleCount); + setIndexCount(indexCount); + if (nextIndex > indexCount) { + setNextIndex(indexCount); + } } @Override @@ -179,7 +305,8 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc @Override public int getContiguousFreeSpace() { - return getTupleCountOffset() - tupleCount * 4 - freeDataEndOffset; + int slotSpace = indexCount * SIZE_OFFSET_GROUP; + return indexSlotsOffset - tupleAppend - slotSpace; } @Override @@ -203,6 +330,11 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc } @Override + public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) { + return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx); + } + + @Override public int getFieldLength(int tupleIndex, int fIdx) { return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx); } @@ -210,40 +342,49 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc @Override public int getTupleLength(int tupleIndex) { int endOffset = getTupleEndOffset(tupleIndex); - if (endOffset < 0) { - return endOffset + getTupleStartOffset(tupleIndex); - } + assert endOffset > 0; return endOffset - getTupleStartOffset(tupleIndex); } + private void setTupleOffsets(int tupleIndex, int start, int length) { + setTupleStartOffset(tupleIndex, start); + setTupleEndOffset(tupleIndex, start + length); + } + @Override public int getTupleEndOffset(int tupleIndex) { - return IntSerDeUtils.getInt(array, tupleCountOffset - 4 * (tupleIndex + 1)); + return IntSerDeUtils.getInt(array, getTupleEndSlotOffset(tupleIndex)); } private void setTupleEndOffset(int tupleIndex, int offset) { - IntSerDeUtils.putInt(array, tupleCountOffset - 4 * (tupleIndex + 1), offset); + IntSerDeUtils.putInt(array, getTupleEndSlotOffset(tupleIndex), offset); } @Override public int getTupleStartOffset(int tupleIndex) { - int offset = tupleIndex == 0 ? 0 : IntSerDeUtils.getInt(array, tupleCountOffset - 4 * tupleIndex); - return Math.abs(offset); + return IntSerDeUtils.getInt(array, getTupleStartSlotOffset(tupleIndex)); } - @Override - public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) { - return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx); + public void setTupleStartOffset(int tupleIndex, int offset) { + IntSerDeUtils.putInt(array, getTupleStartSlotOffset(tupleIndex), offset); + } + + public int getTupleStartSlotOffset(int tupleIndex) { + return indexSlotsOffset - SIZE_OFFSET_GROUP * tupleIndex - SIZE_START_OFFSET; + } + + public int getTupleEndSlotOffset(int tupleIndex) { + return getTupleStartSlotOffset(tupleIndex) - SIZE_END_OFFSET; } @Override public int getTupleCount() { - return tupleCount; + return indexCount; } private int getLiveTupleCount() { int live = 0; - for (int i = tupleCount - 1; i >= 0; i--) { + for (int i = 0; i < indexCount; ++i) { int endOffset = getTupleEndOffset(i); if (endOffset > 0) { live++; @@ -262,7 +403,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc if (getLiveTupleCount() == 0) { ps.print(""); } - ps.printf("(%d, %d)", getLiveTupleCount(), getPhysicalTupleCount()); + ps.printf("(%d, %d)", getLiveTupleCount(), getIndexCount()); } }