switch merge memory tracking method.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4f9e6a82 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4f9e6a82 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4f9e6a82 Branch: refs/heads/ecarm002/interval_join_merge Commit: 4f9e6a82eaff790a50f86848e770e4b1dedf4069 Parents: 1f7ac98 Author: Preston Carman <prest...@apache.org> Authored: Thu Jul 14 09:11:57 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Thu Jul 14 09:11:57 2016 -0700 ---------------------------------------------------------------------- .../joins/AbstractIntervalMergeJoinChecker.java | 14 +++++++ .../IntervalPartitionJoiner.java | 2 +- .../dataflow/std/join/IMergeJoinChecker.java | 3 ++ .../hyracks/dataflow/std/join/MergeJoiner.java | 41 +++++++++++--------- 4 files changed, 41 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java index 0a25c25..cf0bf6a 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java @@ -105,6 +105,20 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge } @Override + public boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex, + IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException { + try { + IntervalJoinUtil.getIntervalPointable(accessorLeft, leftTupleIndex, idLeft, tvp, ipLeft); + IntervalJoinUtil.getIntervalPointable(accessorRight, rightTupleIndex, idRight, tvp, ipRight); + ipLeft.getStart(startLeft); + ipRight.getEnd(endRight); + return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0); + } catch (AsterixException e) { + throw new HyracksDataException(e); + } + } + + @Override public boolean checkToSaveInResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight, int rightTupleIndex, boolean reversed) throws HyracksDataException { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java index 5df7b0a..fe49d2f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java @@ -320,7 +320,7 @@ public class IntervalPartitionJoiner { private int selectPartitionsToReload(int freeSpace, int pid) { for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) { - assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?"; + assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?"; if (freeSpace >= buildRFWriters[id].getFileSize()) { return id; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java index ddf04f3..49a3763 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java @@ -51,6 +51,9 @@ public interface IMergeJoinChecker extends Serializable { boolean checkToRemoveInMemory(ITupleAccessor accessorLeft, ITupleAccessor accessorRight) throws HyracksDataException; + boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex, + IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException; + /** * Check to see if the next right tuple should be loaded during the merge join. * The check is true if the left tuple could match with the next right tuple. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java index 625a24c..d94a63e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java @@ -18,9 +18,11 @@ */ package org.apache.hyracks.dataflow.std.join; +import java.util.LinkedList; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -32,6 +34,7 @@ import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool; import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool; import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager; import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; +import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor; import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager; import org.apache.hyracks.dataflow.std.structures.TuplePointer; @@ -47,10 +50,10 @@ public class MergeJoiner extends AbstractMergeJoiner { private MergeStatus status; - private final TuplePointer tp; private final IDeallocatableFramePool framePool; private IDeletableTupleBufferManager bufferManager; - private ITupleAccessor memoryAccessor; + private ITuplePointerAccessor memoryAccessor; + private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>(); private int leftStreamIndex; private RunFileStream runFileStream; @@ -71,9 +74,8 @@ public class MergeJoiner extends AbstractMergeJoiner { "MergeJoiner does not have enough memory (needs > 0, got " + memorySize + ")."); } framePool = new DeallocatableFramePool(ctx, (memorySize) * ctx.getInitialFrameSize()); - tp = new TuplePointer(); bufferManager = new VariableDeletableTupleMemoryManager(framePool, rightRd); - memoryAccessor = bufferManager.createTupleAccessor(); + memoryAccessor = bufferManager.createTuplePointerAccessor(); // Run File and frame cache (left buffer) leftStreamIndex = TupleAccessor.UNSET; @@ -88,21 +90,23 @@ public class MergeJoiner extends AbstractMergeJoiner { } private boolean addToMemory(ITupleAccessor accessor) throws HyracksDataException { + TuplePointer tp = new TuplePointer(); if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) { + memoryBuffer.add(tp); return true; } return false; } - private void removeFromMemory() throws HyracksDataException { - memoryAccessor.getTuplePointer(tp); + private void removeFromMemory(TuplePointer tp) throws HyracksDataException { + memoryBuffer.remove(tp); bufferManager.deleteTuple(tp); } - private void addToResult(ITupleAccessor accessor1, ITupleAccessor accessor2, IFrameWriter writer) - throws HyracksDataException { - FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, accessor1.getTupleId(), accessor2, - accessor2.getTupleId()); + private void addToResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight, + int rightTupleIndex, IFrameWriter writer) throws HyracksDataException { + FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight, + rightTupleIndex); } @Override @@ -197,21 +201,22 @@ public class MergeJoiner extends AbstractMergeJoiner { private void processLeftTuple(IFrameWriter writer) throws HyracksDataException { // Check against memory (right) if (memoryHasTuples()) { - memoryAccessor.reset(); - memoryAccessor.next(); - while (memoryAccessor.exists()) { + for (int i = memoryBuffer.size() - 1; i > -1; --i) { + memoryAccessor.reset(memoryBuffer.get(i)); // TuplePrinterUtil.printTuple(" --- A outer", inputAccessor[LEFT_PARTITION]); // TuplePrinterUtil.printTuple(" --- A inner", memoryAccessor); - if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], memoryAccessor)) { + if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), + memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) { // add to result // System.err.println(" -- Matched --"); - addToResult(inputAccessor[LEFT_PARTITION], memoryAccessor, writer); + addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), + memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer); } - if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], memoryAccessor)) { + if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), + memoryAccessor, memoryBuffer.get(i).getTupleIndex())) { // remove from memory - removeFromMemory(); + removeFromMemory(memoryBuffer.get(i)); } - memoryAccessor.next(); } } inputAccessor[LEFT_PARTITION].next();