snapshot - preparing for write once read many process on sort merge join
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b4a3fd56 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b4a3fd56 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b4a3fd56 Branch: refs/heads/ecarm002/interval_join_merge Commit: b4a3fd56b637b20ec5be667261002772d8d76603 Parents: e256e63 Author: Preston Carman <prest...@apache.org> Authored: Fri Oct 14 17:50:23 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Fri Oct 14 17:50:23 2016 -0700 ---------------------------------------------------------------------- .../intervalindex/IntervalIndexJoiner.java | 8 +-- .../IntervalPartitionJoiner.java | 20 +++++++- .../hyracks/dataflow/std/join/MergeJoiner.java | 49 +++++++++++++----- .../dataflow/std/join/RunFileStream.java | 52 ++++++++++++++------ 4 files changed, 96 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index 8b7a12e..d3303b6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -435,7 +435,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory]."); } if (bufferManager.getNumTuples(LEFT_PARTITION) > bufferManager.getNumTuples(RIGHT_PARTITION)) { - runFileStream[RIGHT_PARTITION].startRunFile(); + runFileStream[RIGHT_PARTITION].startRunFileWriting(); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Memory is full. Freezing the left branch. (Left memory tuples: " + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: " @@ -444,7 +444,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { bufferManager.printStats("memory details"); rightSpillCount++; } else { - runFileStream[LEFT_PARTITION].startRunFile(); + runFileStream[LEFT_PARTITION].startRunFileWriting(); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Memory is full. Freezing the right branch. (Left memory tuples: " + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: " @@ -456,7 +456,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException { - runFileStream[diskPartition].closeRunFile(); + runFileStream[diskPartition].closeRunFileReading(); accessor.reset(inputBuffer[diskPartition]); accessor.setTupleId(streamIndex[diskPartition]); if (LOGGER.isLoggable(Level.FINE)) { @@ -490,7 +490,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { || (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) { streamIndex[frozenPartition] = accessor.getTupleId(); } - runFileStream[frozenPartition].openRunFile(accessor); + runFileStream[frozenPartition].startReadingRunFile(accessor); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Unfreezing (" + frozenPartition + ")."); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java index 984db20..f57d205 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java @@ -64,6 +64,8 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner { private final int numOfPartitions; private long buildSize = 0; private long probeSize = 0; + private long[] buildPartitionSizes; + private long[] probePartitionSizes; private final TreeMap<RunFilePointer, Integer> probeRunFilePointers; private final VPartitionTupleBufferManager buildBufferManager; @@ -92,8 +94,10 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner { this.accessorProbe = new FrameTupleAccessor(leftRd); reloadBuffer = new VSizeFrame(ctx); - this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);; + this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k); this.imjc = imjc; + buildPartitionSizes = new long[numOfPartitions]; + probePartitionSizes = new long[numOfPartitions]; // TODO fix available memory size this.buildMemory = memorySize; @@ -135,6 +139,7 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner { } inputAccessor[LEFT_PARTITION].next(); probeSize++; + probePartitionSizes[pid]++; } inputBuffer[LEFT_PARTITION].rewind(); probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]); @@ -151,6 +156,16 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner { LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read."); } + System.err.print("build: ["); + for (int i = 0; i < buildPartitionSizes.length; i++) { + System.err.print(buildPartitionSizes[i] + ", "); + } + System.err.println("]"); + System.err.print("probe: ["); + for (int i = 0; i < probePartitionSizes.length; i++) { + System.err.print(probePartitionSizes[i] + ", "); + } + System.err.println("]"); } private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException { @@ -181,6 +196,8 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner { Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k); if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) { fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId)); + System.err.println("join " + probe + "(" + probePartitionSizes[probeRunFilePointers.get(probeId)] + + ") with " + build + "(" + buildPartitionSizes[buildId] + ")"); } } if (!fbms.isEmpty()) { @@ -262,6 +279,7 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner { inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) { return; } + buildPartitionSizes[pid]++; if (buildPid != pid) { // Track new partitions in memory. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java index d4195ec..efb3756 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java @@ -35,6 +35,7 @@ import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor; import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager; +import org.apache.hyracks.dataflow.std.structures.RunFilePointer; import org.apache.hyracks.dataflow.std.structures.TuplePointer; /** @@ -54,7 +55,10 @@ public class MergeJoiner extends AbstractMergeJoiner { private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>(); private int leftStreamIndex; + private final RunFileStream runFileStreamOld; private final RunFileStream runFileStream; + private ITupleAccessor tmpAccessor; + private final RunFilePointer runFilePointer; private final IMergeJoinChecker mjc; @@ -69,6 +73,8 @@ public class MergeJoiner extends AbstractMergeJoiner { IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException { super(ctx, partition, status, locks, leftRd, rightRd); this.mjc = mjc; + tmpAccessor = new TupleAccessor(leftRd); + runFilePointer = new RunFilePointer(); // Memory (right buffer) if (memorySize < 1) { @@ -81,6 +87,7 @@ public class MergeJoiner extends AbstractMergeJoiner { // Run File and frame cache (left buffer) leftStreamIndex = TupleAccessor.UNSET; + runFileStreamOld = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]); runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]); if (LOGGER.isLoggable(Level.FINE)) { @@ -158,7 +165,8 @@ public class MergeJoiner extends AbstractMergeJoiner { private TupleStatus loadSpilledTuple(int partition) throws HyracksDataException { if (!inputAccessor[partition].exists()) { - if (!runFileStream.loadNextBuffer(inputAccessor[partition])) { + runFileStream.loadNextBuffer(tmpAccessor); + if (!runFileStreamOld.loadNextBuffer(inputAccessor[partition])) { return TupleStatus.EMPTY; } } @@ -200,8 +208,8 @@ public class MergeJoiner extends AbstractMergeJoiner { resultAppender.write(writer, true); if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount - + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, " - + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount() + + " results, " + spillCount + " spills, " + runFileStreamOld.getFileCount() + " files, " + + runFileStreamOld.getWriteCount() + " spill frames written, " + runFileStreamOld.getReadCount() + " spill frames read."); } } @@ -209,7 +217,11 @@ public class MergeJoiner extends AbstractMergeJoiner { private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException { // System.err.print("Spill "); - runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]); + runFileStreamOld.addToRunFile(inputAccessor[LEFT_PARTITION]); + if (true) { + runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]); + } + processLeftTuple(writer); // Memory is empty and we can start processing the run file. @@ -264,7 +276,13 @@ public class MergeJoiner extends AbstractMergeJoiner { + bufferManager.getNumTuples() + " tuples memory]."); } - runFileStream.startRunFile(); + if (runFilePointer.getFileOffset() > 0) { + + } else { + runFilePointer.reset(0, 0); + runFileStream.startRunFileWriting(); + } + runFileStreamOld.startRunFileWriting(); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine( "Memory is full. Freezing the right branch. (memory tuples: " + bufferManager.getNumTuples() + ")"); @@ -275,7 +293,7 @@ public class MergeJoiner extends AbstractMergeJoiner { private void continueStream(ITupleAccessor accessor) throws HyracksDataException { // System.err.println("continueStream"); - runFileStream.closeRunFile(); + runFileStreamOld.closeRunFileReading(); accessor.reset(inputBuffer[LEFT_PARTITION]); accessor.setTupleId(leftStreamIndex); if (LOGGER.isLoggable(Level.FINE)) { @@ -289,20 +307,25 @@ public class MergeJoiner extends AbstractMergeJoiner { LOGGER.warning("snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION] + " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, [" + bufferManager.getNumTuples() + " tuples memory, " + spillCount + " spills, " - + (runFileStream.getFileCount() - spillFileCount) + " files, " - + (runFileStream.getWriteCount() - spillWriteCount) + " written, " - + (runFileStream.getReadCount() - spillReadCount) + " read]."); - spillFileCount = runFileStream.getFileCount(); - spillReadCount = runFileStream.getReadCount(); - spillWriteCount = runFileStream.getWriteCount(); + + (runFileStreamOld.getFileCount() - spillFileCount) + " files, " + + (runFileStreamOld.getWriteCount() - spillWriteCount) + " written, " + + (runFileStreamOld.getReadCount() - spillReadCount) + " read]."); + spillFileCount = runFileStreamOld.getFileCount(); + spillReadCount = runFileStreamOld.getReadCount(); + spillWriteCount = runFileStreamOld.getWriteCount(); } + runFileStreamOld.flushAndStopRunFile(accessor); runFileStream.flushAndStopRunFile(accessor); flushMemory(); if (!status.branch[LEFT_PARTITION].isRunFileReading()) { leftStreamIndex = accessor.getTupleId(); } - runFileStream.openRunFile(accessor); + runFileStreamOld.startReadingRunFile(accessor); + + runFileStream.resetReadPointer(runFilePointer.getFileOffset()); + accessor.setTupleId(runFilePointer.getTupleIndex()); + runFileStream.startReadingRunFile(accessor); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Unfreezing right partition."); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java index 7e8a8d1..f50c34a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java @@ -31,6 +31,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.io.RunFileReader; import org.apache.hyracks.dataflow.common.io.RunFileWriter; import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; +import org.apache.hyracks.dataflow.std.structures.RunFilePointer; public class RunFileStream { @@ -79,7 +80,7 @@ public class RunFileStream { return writeCount; } - public void startRunFile() throws HyracksDataException { + public void startRunFileWriting() throws HyracksDataException { runFileCounter++; status.setRunFileWriting(true); @@ -94,6 +95,19 @@ public class RunFileStream { } } + public void resumeRunFileWriting() throws HyracksDataException { + status.setRunFileWriting(true); + String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Long.toString(runFileCounter) + '-' + + this.toString(); + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(prefix); + runFileWriter = new RunFileWriter(file, ctx.getIOManager()); + runFileWriter.open(); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: " + + file + ")."); + } + } + public void addToRunFile(ITupleAccessor accessor) throws HyracksDataException { int idx = accessor.getTupleId(); if (!runFileAppender.append(accessor, idx)) { @@ -105,28 +119,28 @@ public class RunFileStream { tupleCount++; } - public void openRunFile(ITupleAccessor accessor) throws HyracksDataException { + public void startReadingRunFile(ITupleAccessor accessor) throws HyracksDataException { status.setRunFileReading(true); // Create reader - runFileReader = runFileWriter.createDeleteOnCloseReader(); + runFileReader = runFileWriter.createReader(); runFileReader.open(); // Load first frame loadNextBuffer(accessor); } - public void resetReader(ITupleAccessor accessor) throws HyracksDataException { - if (status.isRunFileWriting()) { - flushAndStopRunFile(accessor); - openRunFile(accessor); - } else { - runFileReader.reset(); - - // Load first frame - loadNextBuffer(accessor); - } - } +// public void resetReader(ITupleAccessor accessor) throws HyracksDataException { +// if (status.isRunFileWriting()) { +// flushAndStopRunFile(accessor); +// startReadingRunFile(accessor); +// } else { +// runFileReader.reset(); +// +// // Load first frame +// loadNextBuffer(accessor); +// } +// } public boolean loadNextBuffer(ITupleAccessor accessor) throws HyracksDataException { if (runFileReader.nextFrame(runFileBuffer)) { @@ -138,6 +152,14 @@ public class RunFileStream { return false; } + public long getReadPointer() throws HyracksDataException { + return runFileReader.getReadPointer(); + } + + public void resetReadPointer(long fileOffset) throws HyracksDataException { + runFileReader.reset(fileOffset); + } + public void flushAndStopRunFile(ITupleAccessor accessor) throws HyracksDataException { status.setRunFileWriting(false); @@ -163,7 +185,7 @@ public class RunFileStream { } } - public void closeRunFile() throws HyracksDataException { + public void closeRunFileReading() throws HyracksDataException { status.setRunFileReading(false); runFileReader.close(); }