Deletable frame tuple appender with reuse of tuple index slots

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fd514a0e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fd514a0e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fd514a0e

Branch: refs/heads/ecarm002/interval_join_merge
Commit: fd514a0ed598219faf0ecdfa19e3c894f3b706f1
Parents: 6c31214
Author: Preston Carman <prest...@apache.org>
Authored: Mon Sep 19 17:59:17 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Mon Sep 19 17:59:17 2016 -0700

----------------------------------------------------------------------
 .../common/comm/io/FrameTupleAccessor.java      |   5 +-
 .../sort/util/DeletableFrameTupleAppender.java  | 259 ++++++++++++++-----
 2 files changed, 204 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd514a0e/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index d99e2f2..1c702be 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -35,7 +35,8 @@ import org.apache.hyracks.util.IntSerDeUtils;
 /**
  * FrameTupleCursor is used to navigate over tuples in a Frame. A frame is
  * formatted with tuple data concatenated starting at offset 1, one tuple after
- * another. The first byte is used to notify how big the frame is, so the 
maximum frame size is 255 * initialFrameSetting.
+ * another. The first byte is used to notify how big the frame is,
+ * so the maximum frame size is 255 * initialFrameSetting.
  * Offset FS - 4 holds an int indicating the number of tuples (N) in
  * the frame. FS - ((i + 1) * 4) for i from 0 to N - 1 holds an int indicating
  * the offset of the (i + 1)^th tuple. Every tuple is organized as a sequence 
of
@@ -184,6 +185,7 @@ public class FrameTupleAccessor implements 
IFrameTupleAccessor {
             prettyPrint(i, bbis, dis, sb, recordFields);
         }
         System.err.println(sb.toString());
+        bbis.close();
     }
 
     public void prettyPrint(int tIdx, int[] recordFields) throws IOException {
@@ -192,6 +194,7 @@ public class FrameTupleAccessor implements 
IFrameTupleAccessor {
         StringBuilder sb = new StringBuilder();
         prettyPrint(tIdx, bbis, dis, sb, recordFields);
         System.err.println(sb.toString());
+        bbis.close();
     }
 
     public void prettyPrint(ITupleReference tuple, int fieldsIdx, int descIdx) 
throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd514a0e/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
index 79aba3e..8cae721 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -21,8 +21,11 @@ package org.apache.hyracks.dataflow.std.sort.util;
 
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.PriorityQueue;
 
-import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -32,59 +35,153 @@ import org.apache.hyracks.util.IntSerDeUtils;
  * This is a special frame which is used in TupleMemoryBuffer.
  * This frame has a special structure to organize the deleted spaces.
  * Specifically, the endOffset of the deleted tuple will be set as negative 
number.
- * And we add a special <code>deleted_space</code> field at the last 4 bytes 
to remember how many bytes has been deleted.
+ * And we add a special <code>deleted_space</code> field at the last 4 bytes to
+ * remember how many bytes has been deleted.
+ * The offsets also store both the start and end values because tuples may be 
out of
+ * after several add, remove and reorganize operations.
+ * A frame is formatted with tuple data concatenated starting at offset 0,
+ * one tuple after another.
+ * Offset FS - 4 holds an int indicating the amount of 
<code>deleted_space</code> in the frame.
+ * Offset FS - 4 holds an int indicating the number of tuples (N) in the frame.
+ * FS - ((i + 1) * (4 + 4)) for i from 0 to N - 1 holds an two ints indicating
+ * the offset and length of the (i + 1)^th tuple.
+ * The tuple references are organized as a sequence of ints indicating the 
start of the field
+ * followed by the length of each tuple.
+ * The offset if the start of the frame.
+ * The tuple has been deleted if the length is 0.
+ *
+ * <pre>
+ * [ *tuple_1_bytes*,
+ *   *tuple_2_bytes*,
+ *   ...
+ *   int length, int offset, # tuple 2
+ *   int length, int offset, # tuple 1
+ *   int tuple_append,
+ *   int next_index,
+ *   int deleted_space,
+ *   int index_count,
+ * ]
+ * </pre>
+ *
+ * <pre>
+ * [ *tuple_1_bytes*,
+ *   *tuple_2_bytes*,
+ *   ...
+ *   int end_offset,
+ *   int offset, # tuple 2
+ *   int offset, # tuple 1
+ *   int deleted_space,
+ *   int tuple_count,
+ * ]
+ * </pre>
  */
 public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAccessor {
 
+    public static final Comparator<Pair<Integer, Integer>> 
INDEX_OFFSET_ASC_COMPARATOR = new Comparator<Pair<Integer, Integer>>() {
+        @Override
+        public int compare(Pair<Integer, Integer> p1, Pair<Integer, Integer> 
p2) {
+            return p1.getValue() - p2.getValue();
+        }
+
+    };
+
+    private static final int SIZE_INDEX_COUNT = 4;
     private static final int SIZE_DELETED_SPACE = 4;
+    private static final int SIZE_NEXT_INDEX = 4;
+    private static final int SIZE_TUPLE_APPEND = 4;
+
+    private static final int SIZE_START_OFFSET = 4;
+    private static final int SIZE_END_OFFSET = 4;
+    private static final int SIZE_OFFSET_GROUP = SIZE_END_OFFSET + 
SIZE_START_OFFSET;
+
     private final RecordDescriptor recordDescriptor;
     private ByteBuffer buffer;
-    private int tupleCountOffset;
-    private int tupleCount;
-    private int freeDataEndOffset;
+    private int indexSlotsOffset;
+    private int indexCount;
+    private int tupleAppend;
     private int deletedSpace;
+    private int nextIndex;
     private byte[] array; // to speed up the array visit a little
 
+    private final PriorityQueue<Pair<Integer, Integer>> reorganizeQueue;
+
     public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) {
         this.recordDescriptor = recordDescriptor;
+        reorganizeQueue = new PriorityQueue<>(16, INDEX_OFFSET_ASC_COMPARATOR);
     }
 
-    private int getTupleCountOffset() {
-        return FrameHelper.getTupleCountOffset(buffer.capacity()) - 
SIZE_DELETED_SPACE;
+    private int getIndexCount() {
+        return IntSerDeUtils.getInt(array, getIndexCountOffset());
     }
 
-    private int getFreeDataEndOffset() {
-        return tupleCount == 0 ? 0 : Math.abs(IntSerDeUtils.getInt(array, 
tupleCountOffset - tupleCount * 4));
+    private void setIndexCount(int count) {
+        IntSerDeUtils.putInt(array, getIndexCountOffset(), count);
     }
 
-    private void setFreeDataEndOffset(int offset) {
-        assert (offset >= 0);
-        IntSerDeUtils.putInt(array, tupleCountOffset - tupleCount * 4, offset);
+    private int getIndexCountOffset() {
+        return buffer.capacity() - SIZE_INDEX_COUNT;
     }
 
-    private void setTupleCount(int count) {
-        IntSerDeUtils.putInt(array, tupleCountOffset, count);
+    private int getDeletedSpace() {
+        return IntSerDeUtils.getInt(array, getDeletedSpaceOffset());
     }
 
-    private void setDeleteSpace(int count) {
-        IntSerDeUtils.putInt(array, buffer.capacity() - SIZE_DELETED_SPACE, 
count);
+    private void setDeletedSpace(int space) {
+        IntSerDeUtils.putInt(array, getDeletedSpaceOffset(), space);
     }
 
-    private int getPhysicalTupleCount() {
-        return IntSerDeUtils.getInt(array, tupleCountOffset);
+    private int getDeletedSpaceOffset() {
+        return getIndexCountOffset() - SIZE_DELETED_SPACE;
     }
 
-    private int getDeletedSpace() {
-        return IntSerDeUtils.getInt(array, buffer.capacity() - 
SIZE_DELETED_SPACE);
+    private int getNextIndex() {
+        return IntSerDeUtils.getInt(array, getNextIndexOffset());
+    }
+
+    private void setNextIndex(int index) {
+        IntSerDeUtils.putInt(array, getNextIndexOffset(), index);
+    }
+
+    private int getNextIndexOffset() {
+        return getDeletedSpaceOffset() - SIZE_NEXT_INDEX;
+    }
+
+    private int getAndUpdateNextIndex() {
+        int index = nextIndex;
+        nextIndex = index + 1;
+        while (nextIndex < indexCount) {
+            if (getTupleEndOffset(nextIndex) <= 0) {
+                break;
+            }
+            nextIndex++;
+        }
+        setNextIndex(nextIndex);
+        return index;
+    }
+
+    private int getTupleAppend() {
+        return IntSerDeUtils.getInt(array, getTupleAppendOffset());
+    }
+
+    private void setTupleAppend(int offset) {
+        IntSerDeUtils.putInt(array, getTupleAppendOffset(), offset);
+    }
+
+    private int getTupleAppendOffset() {
+        return getNextIndexOffset() - SIZE_TUPLE_APPEND;
+    }
+
+    private int getIndexSlotOffset() {
+        return getTupleAppendOffset();
     }
 
     @Override
     public void clear(ByteBuffer buffer) throws HyracksDataException {
         this.buffer = buffer;
         this.array = buffer.array();
-        tupleCountOffset = getTupleCountOffset();
-        setTupleCount(0);
-        setDeleteSpace(0);
+        setIndexCount(0);
+        setDeletedSpace(0);
+        setTupleAppend(0);
         resetCounts();
     }
 
@@ -92,14 +189,15 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
     public void reset(ByteBuffer buffer) {
         this.buffer = buffer;
         this.array = buffer.array();
-        tupleCountOffset = getTupleCountOffset();
         resetCounts();
     }
 
     private void resetCounts() {
+        indexSlotsOffset = getIndexSlotOffset();
         deletedSpace = getDeletedSpace();
-        tupleCount = getPhysicalTupleCount();
-        freeDataEndOffset = getFreeDataEndOffset();
+        indexCount = getIndexCount();
+        tupleAppend = getTupleAppend();
+        nextIndex = getNextIndex();
     }
 
     /**
@@ -116,11 +214,18 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
         byte[] src = tupleAccessor.getBuffer().array();
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int length = tupleAccessor.getTupleLength(tIndex);
-        System.arraycopy(src, tStartOffset, array, freeDataEndOffset, length);
-        setTupleCount(++tupleCount);
-        freeDataEndOffset += length;
-        setFreeDataEndOffset(freeDataEndOffset);
-        return tupleCount - 1;
+        System.arraycopy(src, tStartOffset, array, tupleAppend, length);
+        int index = getAndUpdateNextIndex();
+        if (index < indexCount) {
+            // Don't change index count
+        } else {
+            // Increment count
+            setIndexCount(++indexCount);
+        }
+        setTupleOffsets(index, tupleAppend, length);
+        tupleAppend += length;
+        setTupleAppend(tupleAppend);
+        return index;
     }
 
     @Override
@@ -129,7 +234,11 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
         if (endOffset > 0) {
             setTupleEndOffset(tupleIndex, -endOffset);
             deletedSpace += endOffset - getTupleStartOffset(tupleIndex);
-            setDeleteSpace(deletedSpace);
+            setDeletedSpace(deletedSpace);
+            if (nextIndex > tupleIndex) {
+                nextIndex = tupleIndex;
+                setNextIndex(nextIndex);
+            }
         }
     }
 
@@ -140,36 +249,53 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
         }
         reclaimDeletedEnding();
 
-        freeDataEndOffset = 0;
-        int endOffset = 0;
-        for (int i = 0; i < tupleCount; i++) {
-            int startOffset = Math.abs(endOffset);
+        // Build reorganize queue
+        int endOffset;
+        int startOffset;
+        for (int i = 0; i < indexCount; i++) {
             endOffset = getTupleEndOffset(i);
+            if (endOffset > 0) {
+                reorganizeQueue.add(new ImmutablePair<Integer, Integer>(i, 
getTupleStartOffset(i)));
+            }
+        }
+
+        int index;
+        tupleAppend = 0;
+        while (!reorganizeQueue.isEmpty()) {
+            index = reorganizeQueue.remove().getKey();
+            startOffset = getTupleStartOffset(index);
+            endOffset = getTupleEndOffset(index);
             if (endOffset >= 0) {
                 int length = endOffset - startOffset;
                 assert length >= 0;
-                if (freeDataEndOffset != startOffset) {
-                    System.arraycopy(array, startOffset, array, 
freeDataEndOffset, length);
+                if (tupleAppend != startOffset) {
+                    System.arraycopy(array, startOffset, array, tupleAppend, 
length);
                 }
-                freeDataEndOffset += length;
+                setTupleOffsets(index, tupleAppend, length);
+                tupleAppend += length;
             }
-            setTupleEndOffset(i, freeDataEndOffset);
         }
-        setFreeDataEndOffset(freeDataEndOffset);
+        setTupleAppend(tupleAppend);
         deletedSpace = 0;
-        setDeleteSpace(0);
+        setDeletedSpace(0);
+
+        // Clean up
+        reorganizeQueue.clear();
     }
 
     private void reclaimDeletedEnding() {
-        for (int i = tupleCount - 1; i >= 0; i--) {
+        for (int i = indexCount - 1; i >= 0; i--) {
             int endOffset = getTupleEndOffset(i);
             if (endOffset <= 0) {
-                tupleCount--;
+                indexCount--;
             } else {
                 break;
             }
         }
-        setTupleCount(tupleCount);
+        setIndexCount(indexCount);
+        if (nextIndex > indexCount) {
+            setNextIndex(indexCount);
+        }
     }
 
     @Override
@@ -179,7 +305,8 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
 
     @Override
     public int getContiguousFreeSpace() {
-        return getTupleCountOffset() - tupleCount * 4 - freeDataEndOffset;
+        int slotSpace = indexCount * SIZE_OFFSET_GROUP;
+        return indexSlotsOffset - tupleAppend - slotSpace;
     }
 
     @Override
@@ -203,6 +330,11 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
     }
 
     @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + 
getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
     public int getFieldLength(int tupleIndex, int fIdx) {
         return getFieldEndOffset(tupleIndex, fIdx) - 
getFieldStartOffset(tupleIndex, fIdx);
     }
@@ -210,40 +342,49 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
     @Override
     public int getTupleLength(int tupleIndex) {
         int endOffset = getTupleEndOffset(tupleIndex);
-        if (endOffset < 0) {
-            return endOffset + getTupleStartOffset(tupleIndex);
-        }
+        assert endOffset > 0;
         return endOffset - getTupleStartOffset(tupleIndex);
     }
 
+    private void setTupleOffsets(int tupleIndex, int start, int length) {
+        setTupleStartOffset(tupleIndex, start);
+        setTupleEndOffset(tupleIndex, start + length);
+    }
+
     @Override
     public int getTupleEndOffset(int tupleIndex) {
-        return IntSerDeUtils.getInt(array, tupleCountOffset - 4 * (tupleIndex 
+ 1));
+        return IntSerDeUtils.getInt(array, getTupleEndSlotOffset(tupleIndex));
     }
 
     private void setTupleEndOffset(int tupleIndex, int offset) {
-        IntSerDeUtils.putInt(array, tupleCountOffset - 4 * (tupleIndex + 1), 
offset);
+        IntSerDeUtils.putInt(array, getTupleEndSlotOffset(tupleIndex), offset);
     }
 
     @Override
     public int getTupleStartOffset(int tupleIndex) {
-        int offset = tupleIndex == 0 ? 0 : IntSerDeUtils.getInt(array, 
tupleCountOffset - 4 * tupleIndex);
-        return Math.abs(offset);
+        return IntSerDeUtils.getInt(array, 
getTupleStartSlotOffset(tupleIndex));
     }
 
-    @Override
-    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
-        return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + 
getFieldStartOffset(tupleIndex, fIdx);
+    public void setTupleStartOffset(int tupleIndex, int offset) {
+        IntSerDeUtils.putInt(array, getTupleStartSlotOffset(tupleIndex), 
offset);
+    }
+
+    public int getTupleStartSlotOffset(int tupleIndex) {
+        return indexSlotsOffset - SIZE_OFFSET_GROUP * tupleIndex - 
SIZE_START_OFFSET;
+    }
+
+    public int getTupleEndSlotOffset(int tupleIndex) {
+        return getTupleStartSlotOffset(tupleIndex) - SIZE_END_OFFSET;
     }
 
     @Override
     public int getTupleCount() {
-        return tupleCount;
+        return indexCount;
     }
 
     private int getLiveTupleCount() {
         int live = 0;
-        for (int i = tupleCount - 1; i >= 0; i--) {
+        for (int i = 0; i < indexCount; ++i) {
             int endOffset = getTupleEndOffset(i);
             if (endOffset > 0) {
                 live++;
@@ -262,7 +403,7 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
         if (getLiveTupleCount() == 0) {
             ps.print("");
         }
-        ps.printf("(%d, %d)", getLiveTupleCount(), getPhysicalTupleCount());
+        ps.printf("(%d, %d)", getLiveTupleCount(), getIndexCount());
     }
 
 }

Reply via email to