switch merge memory tracking method.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4f9e6a82
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4f9e6a82
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4f9e6a82

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 4f9e6a82eaff790a50f86848e770e4b1dedf4069
Parents: 1f7ac98
Author: Preston Carman <prest...@apache.org>
Authored: Thu Jul 14 09:11:57 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Thu Jul 14 09:11:57 2016 -0700

----------------------------------------------------------------------
 .../joins/AbstractIntervalMergeJoinChecker.java | 14 +++++++
 .../IntervalPartitionJoiner.java                |  2 +-
 .../dataflow/std/join/IMergeJoinChecker.java    |  3 ++
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 41 +++++++++++---------
 4 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
index 0a25c25..cf0bf6a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
@@ -105,6 +105,20 @@ public abstract class AbstractIntervalMergeJoinChecker 
implements IIntervalMerge
     }
 
     @Override
+    public boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex,
+            IFrameTupleAccessor accessorRight, int rightTupleIndex) throws 
HyracksDataException {
+        try {
+            IntervalJoinUtil.getIntervalPointable(accessorLeft, 
leftTupleIndex, idLeft, tvp, ipLeft);
+            IntervalJoinUtil.getIntervalPointable(accessorRight, 
rightTupleIndex, idRight, tvp, ipRight);
+            ipLeft.getStart(startLeft);
+            ipRight.getEnd(endRight);
+            return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), 
startLeft, endRight) <= 0);
+        } catch (AsterixException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
     public boolean checkToSaveInResult(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex,
             IFrameTupleAccessor accessorRight, int rightTupleIndex, boolean 
reversed) throws HyracksDataException {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 5df7b0a..fe49d2f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -320,7 +320,7 @@ public class IntervalPartitionJoiner {
 
     private int selectPartitionsToReload(int freeSpace, int pid) {
         for (int id = ipjd.buildNextSpilled(0); id >= 0; id = 
ipjd.buildNextSpilled(id + 1)) {
-            assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled 
partition have size 0?";
+            assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled 
partition have size 0?";
             if (freeSpace >= buildRFWriters[id].getFileSize()) {
                 return id;
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
index ddf04f3..49a3763 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
@@ -51,6 +51,9 @@ public interface IMergeJoinChecker extends Serializable {
     boolean checkToRemoveInMemory(ITupleAccessor accessorLeft, ITupleAccessor 
accessorRight)
             throws HyracksDataException;
 
+    boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex,
+            IFrameTupleAccessor accessorRight, int rightTupleIndex) throws 
HyracksDataException;
+
     /**
      * Check to see if the next right tuple should be loaded during the merge 
join.
      * The check is true if the left tuple could match with the next right 
tuple.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 625a24c..d94a63e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.dataflow.std.join;
 
+import java.util.LinkedList;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +34,7 @@ import 
org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import 
org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
 import 
org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
@@ -47,10 +50,10 @@ public class MergeJoiner extends AbstractMergeJoiner {
 
     private MergeStatus status;
 
-    private final TuplePointer tp;
     private final IDeallocatableFramePool framePool;
     private IDeletableTupleBufferManager bufferManager;
-    private ITupleAccessor memoryAccessor;
+    private ITuplePointerAccessor memoryAccessor;
+    private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
 
     private int leftStreamIndex;
     private RunFileStream runFileStream;
@@ -71,9 +74,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
                     "MergeJoiner does not have enough memory (needs > 0, got " 
+ memorySize + ").");
         }
         framePool = new DeallocatableFramePool(ctx, (memorySize) * 
ctx.getInitialFrameSize());
-        tp = new TuplePointer();
         bufferManager = new VariableDeletableTupleMemoryManager(framePool, 
rightRd);
-        memoryAccessor = bufferManager.createTupleAccessor();
+        memoryAccessor = bufferManager.createTuplePointerAccessor();
 
         // Run File and frame cache (left buffer)
         leftStreamIndex = TupleAccessor.UNSET;
@@ -88,21 +90,23 @@ public class MergeJoiner extends AbstractMergeJoiner {
     }
 
     private boolean addToMemory(ITupleAccessor accessor) throws 
HyracksDataException {
+        TuplePointer tp = new TuplePointer();
         if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) {
+            memoryBuffer.add(tp);
             return true;
         }
         return false;
     }
 
-    private void removeFromMemory() throws HyracksDataException {
-        memoryAccessor.getTuplePointer(tp);
+    private void removeFromMemory(TuplePointer tp) throws HyracksDataException 
{
+        memoryBuffer.remove(tp);
         bufferManager.deleteTuple(tp);
     }
 
-    private void addToResult(ITupleAccessor accessor1, ITupleAccessor 
accessor2, IFrameWriter writer)
-            throws HyracksDataException {
-        FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, 
accessor1.getTupleId(), accessor2,
-                accessor2.getTupleId());
+    private void addToResult(IFrameTupleAccessor accessorLeft, int 
leftTupleIndex, IFrameTupleAccessor accessorRight,
+            int rightTupleIndex, IFrameWriter writer) throws 
HyracksDataException {
+        FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, 
leftTupleIndex, accessorRight,
+                rightTupleIndex);
     }
 
     @Override
@@ -197,21 +201,22 @@ public class MergeJoiner extends AbstractMergeJoiner {
     private void processLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
         // Check against memory (right)
         if (memoryHasTuples()) {
-            memoryAccessor.reset();
-            memoryAccessor.next();
-            while (memoryAccessor.exists()) {
+            for (int i = memoryBuffer.size() - 1; i > -1; --i) {
+                memoryAccessor.reset(memoryBuffer.get(i));
                 //                TuplePrinterUtil.printTuple("     --- A 
outer", inputAccessor[LEFT_PARTITION]);
                 //                TuplePrinterUtil.printTuple("     --- A 
inner", memoryAccessor);
-                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], 
memoryAccessor)) {
+                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
+                        memoryAccessor, memoryBuffer.get(i).getTupleIndex(), 
false)) {
                     // add to result
                     //                    System.err.println("  -- Matched 
--");
-                    addToResult(inputAccessor[LEFT_PARTITION], memoryAccessor, 
writer);
+                    addToResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
+                            memoryAccessor, 
memoryBuffer.get(i).getTupleIndex(), writer);
                 }
-                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], 
memoryAccessor)) {
+                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
+                        memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
                     // remove from memory
-                    removeFromMemory();
+                    removeFromMemory(memoryBuffer.get(i));
                 }
-                memoryAccessor.next();
             }
         }
         inputAccessor[LEFT_PARTITION].next();

Reply via email to