Add stats tracking for interval joins.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a9729514 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a9729514 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a9729514 Branch: refs/heads/ecarm002/interval_join_merge Commit: a9729514d7d517a24df2d290d15e854af6d65a51 Parents: 1487f2b Author: Preston Carman <prest...@apache.org> Authored: Thu Aug 25 11:43:57 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Thu Aug 25 11:43:57 2016 -0700 ---------------------------------------------------------------------- .../intervalindex/IntervalIndexJoiner.java | 54 ++++++++++++-------- .../InMemoryIntervalPartitionJoin.java | 18 ++++++- .../IntervalPartitionJoiner.java | 44 ++++++++++++---- .../IPartitionedTupleBufferManager.java | 2 + .../VPartitionTupleBufferManager.java | 5 ++ .../dataflow/std/join/AbstractMergeJoiner.java | 10 ++-- .../hyracks/dataflow/std/join/MergeJoiner.java | 34 +++++++----- .../dataflow/std/join/NestedLoopJoin.java | 23 ++++++++- .../join/NestedLoopJoinOperatorDescriptor.java | 28 ++-------- .../dataflow/std/join/RunFileStream.java | 27 ++++++++-- 10 files changed, 160 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index 6f04cad..e4c4cbe 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -29,11 +29,9 @@ import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFacto import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedDeletableTupleBufferManager; import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; @@ -57,23 +55,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private static final Logger LOGGER = Logger.getLogger(IntervalIndexJoiner.class.getName()); - private IPartitionedDeletableTupleBufferManager bufferManager; + private final IPartitionedDeletableTupleBufferManager bufferManager; - private ActiveSweepManager[] activeManager; - private ITuplePointerAccessor[] memoryAccessor; - private int[] streamIndex; - private RunFileStream[] runFileStream; + private final ActiveSweepManager[] activeManager; + private final ITuplePointerAccessor[] memoryAccessor; + private final int[] streamIndex; + private final RunFileStream[] runFileStream; - private LinkedList<TuplePointer> buffer = new LinkedList<>(); + private final LinkedList<TuplePointer> buffer = new LinkedList<>(); - private IIntervalMergeJoinChecker imjc; + private final IIntervalMergeJoinChecker imjc; - protected byte point; + private final byte point; - private MergeStatus status; + private final int leftKey; + private final int rightKey; - private int leftKey; - private int rightKey; + private long joinComparisonCount = 0; + private long joinResultCount = 0; + private long spillCount = 0; + private long spillReadCount = 0; + private long spillWriteCount = 0; public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks, Comparator<EndPointIndexItem> endPointComparator, @@ -87,8 +89,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { this.leftKey = leftKeys[0]; this.rightKey = rightKeys[0]; - this.status = status; - RecordDescriptor[] recordDescriptors = new RecordDescriptor[JOIN_PARTITIONS]; recordDescriptors[LEFT_PARTITION] = leftRd; recordDescriptors[RIGHT_PARTITION] = rightRd; @@ -119,8 +119,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]); runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]); - // Result - resultAppender = new FrameTupleAppender(new VSizeFrame(ctx)); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize + " frames of memory."); @@ -134,6 +132,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } else { FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2); } + joinResultCount++; } @Override @@ -143,6 +142,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { activeManager[RIGHT_PARTITION].clear(); runFileStream[LEFT_PARTITION].close(); runFileStream[RIGHT_PARTITION].close(); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount + + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, " + + spillReadCount + " spill frames read."); + } } private void flushMemory(int partition) throws HyracksDataException { @@ -378,10 +382,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { outerAccessor.reset(outerTp); for (TuplePointer innerTp : inner) { innerAccessor.reset(innerTp); - if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), - reversed)) { - addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), reversed, writer); + if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, + innerTp.getTupleIndex(), reversed)) { + addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), + reversed, writer); } + joinComparisonCount++; } } if (LOGGER.isLoggable(Level.FINE)) { @@ -394,11 +400,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException { for (TuplePointer outerTp : outer) { outerAccessor.reset(outerTp); - if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(), - reversed)) { + if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, + tupleAccessor.getTupleId(), reversed)) { addToResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(), reversed, writer); } + joinComparisonCount++; } } @@ -427,6 +434,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Continue with stream (" + diskPartition + ")."); } + spillCount++; + spillReadCount += runFileStream[diskPartition].getReadCount(); + spillWriteCount += runFileStream[diskPartition].getWriteCount(); } private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor, int flushPartition) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java index b5256b5..88ff727 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java @@ -35,13 +35,16 @@ import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; public class InMemoryIntervalPartitionJoin { + private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName()); + private final FrameTupleAccessor accessorBuild; private final FrameTupleAppender appender; private final IFrameBufferManager fbm; private BufferInfo bufferInfo; private final IIntervalMergeJoinChecker imjc; - private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName()); + private long joinComparisonCount = 0; + private long joinResultCount = 0; public InMemoryIntervalPartitionJoin(IHyracksTaskContext ctx, IFrameBufferManager fbm, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd) @@ -55,6 +58,14 @@ public class InMemoryIntervalPartitionJoin { "InMemoryIntervalPartitionJoin has been created for Thread ID " + Thread.currentThread().getId() + "."); } + public long getComparisonCount() { + return joinComparisonCount; + } + + public long getResultCount() { + return joinResultCount; + } + public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer) throws HyracksDataException { if (fbm.getNumFrames() != 0) { @@ -62,9 +73,11 @@ public class InMemoryIntervalPartitionJoin { fbm.getFrame(frameIndex, bufferInfo); accessorBuild.reset(bufferInfo.getBuffer()); for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) { - if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) { + if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, + false)) { appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer); } + joinComparisonCount++; } } } @@ -77,5 +90,6 @@ public class InMemoryIntervalPartitionJoin { private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe, int probeSidetIx, IFrameWriter writer) throws HyracksDataException { FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx); + joinResultCount++; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java index fe49d2f..e943a48 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java @@ -52,14 +52,16 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer; */ public class IntervalPartitionJoiner { - // Used for special probe BigObject which can not be held into the Join memory - private FrameTupleAppender bigProbeFrameAppender; + private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName()); - enum SIDE { + private enum SIDE { BUILD, PROBE } + // Used for special probe BigObject which can not be held into the Join memory + private FrameTupleAppender bigProbeFrameAppender; + private IHyracksTaskContext ctx; private final String buildRelName; @@ -85,8 +87,6 @@ public class IntervalPartitionJoiner { private final FrameTupleAccessor accessorBuild; private final FrameTupleAccessor accessorProbe; - private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName()); - // stats information private IntervalPartitionJoinData ipjd; @@ -95,6 +95,12 @@ public class IntervalPartitionJoiner { private IIntervalMergeJoinChecker imjc; + private long joinComparisonCount = 0; + private long joinResultCount = 0; + private long spillCount = 0; + private long spillReadCount = 0; + private long spillWriteCount = 0; + public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions, String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) { @@ -220,6 +226,8 @@ public class IntervalPartitionJoiner { private void spillPartition(int pid) throws HyracksDataException { RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); + spillCount++; + spillWriteCount += buildBufferManager.getNumFrames(pid); buildBufferManager.flushPartition(pid, writer); buildBufferManager.clearPartition(pid); ipjd.buildSpill(pid); @@ -261,7 +269,9 @@ public class IntervalPartitionJoiner { private void flushAndClearBuildSpilledPartition() throws HyracksDataException { for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) { if (buildBufferManager.getNumTuples(pid) > 0) { - buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD)); + spillWriteCount += buildBufferManager.getNumFrames(pid); + RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD); + buildBufferManager.flushPartition(pid, runFileWriter); buildBufferManager.clearPartition(pid); buildRFWriters[pid].close(); } @@ -271,7 +281,9 @@ public class IntervalPartitionJoiner { private void flushAndClearProbeSpilledPartition() throws HyracksDataException { for (int pid = 0; pid < numOfPartitions; ++pid) { if (probeBufferManager.getNumTuples(pid) > 0) { - probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE)); + spillWriteCount += probeBufferManager.getNumFrames(pid); + RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE); + probeBufferManager.flushPartition(pid, runFileWriter); probeBufferManager.clearPartition(pid); probeRFWriters[pid].close(); } @@ -310,6 +322,7 @@ public class IntervalPartitionJoiner { return false; } } + spillReadCount++; } r.close(); @@ -329,13 +342,15 @@ public class IntervalPartitionJoiner { } private void createInMemoryJoiner(int pid) throws HyracksDataException { - this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx, + inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx, buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd); } private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException { - this.inMemJoiner[pid].closeJoin(writer); - this.inMemJoiner[pid] = null; + joinComparisonCount += inMemJoiner[pid].getComparisonCount(); + joinResultCount += inMemJoiner[pid].getResultCount(); + inMemJoiner[pid].closeJoin(writer); + inMemJoiner[pid] = null; } public void initProbe() throws HyracksDataException { @@ -374,6 +389,8 @@ public class IntervalPartitionJoiner { break; } RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE); + spillCount++; + spillWriteCount += probeBufferManager.getNumFrames(pid); probeBufferManager.flushPartition(victim, runFileWriter); probeBufferManager.clearPartition(victim); } @@ -582,7 +599,7 @@ public class IntervalPartitionJoiner { } public int buildNextInMemory(int pid) { - int nextPid = buildSpilledStatus.nextClearBit(pid); + int nextPid = buildSpilledStatus.nextClearBit(pid); if (nextPid >= numOfPartitions) { return -1; } @@ -644,6 +661,11 @@ public class IntervalPartitionJoiner { FileUtils.deleteQuietly(rfw.getFileReference().getFile()); } } + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, " + + joinResultCount + " results, " + spillCount + " spills, " + spillWriteCount + + " spill frames written, " + spillReadCount + " spill frames read."); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java index d844e7d..c0c96b3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java @@ -31,6 +31,8 @@ public interface IPartitionedTupleBufferManager { int getNumTuples(int partition); + int getNumFrames(int partition); + int getPhysicalSize(int partition); /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java index 104f1ce..4a4cb5d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java @@ -90,6 +90,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana } @Override + public int getNumFrames(int partition) { + return partitionArray[partition].getNumFrames(); + } + + @Override public int getPhysicalSize(int partitionId) { int size = 0; IFrameBufferManager partition = partitionArray[partitionId]; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java index f8d328b..8006790 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java @@ -53,15 +53,13 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { protected static final int LEFT_PARTITION = 0; protected static final int RIGHT_PARTITION = 1; + protected final ByteBuffer[] inputBuffer; + protected final FrameTupleAppender resultAppender; protected final ITupleAccessor[] inputAccessor; - protected ByteBuffer[] inputBuffer; - - private MergeJoinLocks locks; - private MergeStatus status; + protected final MergeStatus status; private final int partition; - - protected FrameTupleAppender resultAppender; + private final MergeJoinLocks locks; public AbstractMergeJoiner(IHyracksTaskContext ctx, int partition, MergeStatus status, MergeJoinLocks locks, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java index d94a63e..03283d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java @@ -24,11 +24,9 @@ import java.util.logging.Logger; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool; import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool; @@ -48,24 +46,27 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer; */ public class MergeJoiner extends AbstractMergeJoiner { - private MergeStatus status; + private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName()); private final IDeallocatableFramePool framePool; - private IDeletableTupleBufferManager bufferManager; - private ITuplePointerAccessor memoryAccessor; - private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>(); + private final IDeletableTupleBufferManager bufferManager; + private final ITuplePointerAccessor memoryAccessor; + private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>(); private int leftStreamIndex; - private RunFileStream runFileStream; + private final RunFileStream runFileStream; private final IMergeJoinChecker mjc; - private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName()); + private long joinComparisonCount = 0; + private long joinResultCount = 0; + private long spillWriteCount = 0; + private long spillReadCount = 0; + private long spillCount = 0; public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks, IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException { super(ctx, partition, status, locks, leftRd, rightRd); - this.status = status; this.mjc = mjc; // Memory (right buffer) @@ -81,8 +82,6 @@ public class MergeJoiner extends AbstractMergeJoiner { leftStreamIndex = TupleAccessor.UNSET; runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]); - // Result - resultAppender = new FrameTupleAppender(new VSizeFrame(ctx)); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine( "MergeJoiner has started partition " + partition + " with " + memorySize + " frames of memory."); @@ -107,11 +106,17 @@ public class MergeJoiner extends AbstractMergeJoiner { int rightTupleIndex, IFrameWriter writer) throws HyracksDataException { FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight, rightTupleIndex); + joinResultCount++; } @Override public void closeResult(IFrameWriter writer) throws HyracksDataException { resultAppender.write(writer, true); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount + + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, " + + spillReadCount + " spill frames read."); + } } private void flushMemory() throws HyracksDataException { @@ -203,15 +208,13 @@ public class MergeJoiner extends AbstractMergeJoiner { if (memoryHasTuples()) { for (int i = memoryBuffer.size() - 1; i > -1; --i) { memoryAccessor.reset(memoryBuffer.get(i)); - // TuplePrinterUtil.printTuple(" --- A outer", inputAccessor[LEFT_PARTITION]); - // TuplePrinterUtil.printTuple(" --- A inner", memoryAccessor); if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) { // add to result - // System.err.println(" -- Matched --"); addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer); } + joinComparisonCount++; if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor, memoryBuffer.get(i).getTupleIndex())) { // remove from memory @@ -249,6 +252,9 @@ public class MergeJoiner extends AbstractMergeJoiner { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Continue with left stream."); } + spillCount++; + spillReadCount += runFileStream.getReadCount(); + spillWriteCount += runFileStream.getWriteCount(); } private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index 202aac6..3d99d6c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -20,6 +20,8 @@ package org.apache.hyracks.dataflow.std.join; import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; @@ -43,6 +45,8 @@ import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager; import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; public class NestedLoopJoin { + private static final Logger LOGGER = Logger.getLogger(NestedLoopJoin.class.getName()); + private final FrameTupleAccessor accessorInner; private final FrameTupleAccessor accessorOuter; private final FrameTupleAppender appender; @@ -58,6 +62,11 @@ public class NestedLoopJoin { private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal private BufferInfo tempInfo = new BufferInfo(null, -1, -1); + private long joinComparisonCount = 0; + private long joinResultCount = 0; + private long spillWriteCount = 0; + private long spillReadCount = 0; + public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner, ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriter[] missingWriters) throws HyracksDataException { @@ -99,6 +108,7 @@ public class NestedLoopJoin { public void cache(ByteBuffer buffer) throws HyracksDataException { runFileWriter.nextFrame(buffer); + spillWriteCount++; } public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException { @@ -109,6 +119,7 @@ public class NestedLoopJoin { for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); } + spillReadCount++; } runFileReader.close(); outerBufferMngr.reset(); @@ -135,6 +146,7 @@ public class NestedLoopJoin { matchFound = true; appendToResults(i, j, writer); } + joinComparisonCount++; } if (!matchFound && isLeftOuter) { @@ -149,9 +161,9 @@ public class NestedLoopJoin { private boolean evaluatePredicate(int tIx1, int tIx2) { if (isReversed) { //Role Reversal Optimization is triggered - return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1)); + return (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1); } else { - return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2)); + return (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2); } } @@ -166,6 +178,7 @@ public class NestedLoopJoin { private void appendResultToFrame(FrameTupleAccessor accessor1, int tupleId1, FrameTupleAccessor accessor2, int tupleId2, IFrameWriter writer) throws HyracksDataException { FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2); + joinResultCount++; } public void closeCache() throws HyracksDataException { @@ -181,11 +194,17 @@ public class NestedLoopJoin { for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); } + spillReadCount++; } runFileReader.close(); outerBufferMngr.reset(); appender.write(writer, true); + + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("NestedLoopJoin statitics: " + joinComparisonCount + " comparisons, " + joinResultCount + + " results, " + spillWriteCount + " frames written, " + spillReadCount + " frames read."); + } } private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index 09207b9..4fa1498 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -19,9 +19,6 @@ package org.apache.hyracks.dataflow.std.join; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -98,22 +95,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor public static class JoinCacheTaskState extends AbstractStateObject { private NestedLoopJoin joiner; - public JoinCacheTaskState() { - } - private JoinCacheTaskState(JobId jobId, TaskId taskId) { super(jobId, taskId); } - - @Override - public void toBytes(DataOutput out) throws IOException { - - } - - @Override - public void fromBytes(DataInput in) throws IOException { - - } } private class JoinCacheActivityNode extends AbstractActivityNode { @@ -132,8 +116,8 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0); final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx); - final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null) - ? predEvaluatorFactory.createPredicateEvaluator() : null); + final IPredicateEvaluator predEvaluator = (predEvaluatorFactory != null) + ? predEvaluatorFactory.createPredicateEvaluator() : null; final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null; if (isLeftOuter) { @@ -142,7 +126,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor } } - IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() { + return new AbstractUnaryInputSinkOperatorNodePushable() { private JoinCacheTaskState state; @Override @@ -170,9 +154,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public void fail() throws HyracksDataException { + // No variables to update. } }; - return op; } } @@ -186,8 +170,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { - - IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() { + return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { private JoinCacheTaskState state; @Override @@ -216,7 +199,6 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor writer.fail(); } }; - return op; } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java index afeed07..aaaaaf4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java @@ -37,7 +37,6 @@ public class RunFileStream { private static final Logger LOGGER = Logger.getLogger(RunFileStream.class.getName()); private final String key; - private int runFileCounter; private final IFrame runFileBuffer; private final IFrameTupleAppender runFileAppender; private RunFileWriter runFileWriter; @@ -46,26 +45,41 @@ public class RunFileStream { private final IHyracksTaskContext ctx; + private long runFileCounter = 0; + private long readCount = 0; + private long writeCount = 0; + public RunFileStream(IHyracksTaskContext ctx, String key, IRunFileStreamStatus status) throws HyracksDataException { this.ctx = ctx; this.key = key; this.status = status; - runFileCounter = 0; runFileBuffer = new VSizeFrame(ctx); runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx)); } + public long getReadCount() { + return readCount; + } + + public long getWriteCount() { + return writeCount; + } + public void startRunFile() throws HyracksDataException { + readCount = 0; + writeCount = 0; + runFileCounter++; + status.setRunFileWriting(true); - String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Integer.toString(runFileCounter) + '-' + String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Long.toString(runFileCounter) + '-' + this.toString(); FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(prefix); runFileWriter = new RunFileWriter(file, ctx.getIOManager()); runFileWriter.open(); - ++runFileCounter; if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: " + file + ")."); + LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: " + + file + ")."); } } @@ -73,6 +87,7 @@ public class RunFileStream { int idx = accessor.getTupleId(); if (!runFileAppender.append(accessor, idx)) { runFileAppender.write(runFileWriter, true); + writeCount++; runFileAppender.append(accessor, idx); } } @@ -104,6 +119,7 @@ public class RunFileStream { if (runFileReader.nextFrame(runFileBuffer)) { accessor.reset(runFileBuffer.getBuffer()); accessor.next(); + readCount++; return true; } return false; @@ -129,6 +145,7 @@ public class RunFileStream { // Flush buffer. if (runFileAppender.getTupleCount() > 0) { runFileAppender.write(runFileWriter, true); + writeCount++; } }