[FLINK-1285] Make Merge-Join aware of object-reuse setting This closes #259
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d529749c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d529749c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d529749c Branch: refs/heads/master Commit: d529749c8f45af693efffe1f69860dae0bfe70bf Parents: b7b32a0 Author: Aljoscha Krettek <[email protected]> Authored: Thu Dec 11 14:58:23 2014 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Jan 7 19:16:10 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/operators/MatchDriver.java | 7 +- .../resettable/BlockResettableIterator.java | 210 --------- .../NonReusingBlockResettableIterator.java | 204 +++++++++ .../ReusingBlockResettableIterator.java | 100 +++++ .../operators/sort/MergeMatchIterator.java | 429 ------------------ .../sort/NonReusingMergeMatchIterator.java | 424 ++++++++++++++++++ .../sort/ReusingMergeMatchIterator.java | 435 +++++++++++++++++++ .../resettable/BlockResettableIteratorTest.java | 202 --------- .../NonReusingBlockResettableIteratorTest.java | 201 +++++++++ .../ReusingBlockResettableIteratorTest.java | 201 +++++++++ .../NonReusingSortMergeMatchIteratorITCase.java | 371 ++++++++++++++++ .../ReusingSortMergeMatchIteratorITCase.java | 371 ++++++++++++++++ .../sort/SortMergeMatchIteratorITCase.java | 373 ---------------- .../operators/util/HashVsSortMiniBenchmark.java | 6 +- 14 files changed, 2314 insertions(+), 1220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java index 2d051ad..f8e4a29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java @@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator; import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator; +import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.FlatJoinFunction; @@ -32,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator; -import org.apache.flink.runtime.operators.sort.MergeMatchIterator; +import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; @@ -125,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT if (this.objectReuseEnabled) { switch (ls) { case MERGE: - this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); + this.matchIterator = new ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); break; case HYBRIDHASH_BUILD_FIRST: @@ -140,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT } else { switch (ls) { case MERGE: - this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); + this.matchIterator = new NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); break; case HYBRIDHASH_BUILD_FIRST: http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java deleted file mode 100644 index 0019c8c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators.resettable; - -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.MemoryAllocationException; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.util.ResettableIterator; - -/** - * Implementation of an iterator that fetches a block of data into main memory and offers resettable - * access to the data in that block. - * - */ -public class BlockResettableIterator<T> extends AbstractBlockResettableIterator<T> implements ResettableIterator<T> { - - public static final Logger LOG = LoggerFactory.getLogger(BlockResettableIterator.class); - - // ------------------------------------------------------------------------ - - protected Iterator<T> input; - - private T nextElement; - - private final T reuseElement; - - private T leftOverElement; - - private boolean readPhase; - - private boolean noMoreBlocks; - - // ------------------------------------------------------------------------ - - public BlockResettableIterator(MemoryManager memoryManager, Iterator<T> input, - TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask) - throws MemoryAllocationException - { - this(memoryManager, serializer, numPages, ownerTask); - this.input = input; - } - - public BlockResettableIterator(MemoryManager memoryManager, - TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask) - throws MemoryAllocationException - { - super(serializer, memoryManager, numPages, ownerTask); - - this.reuseElement = serializer.createInstance(); - } - - // ------------------------------------------------------------------------ - - public void reopen(Iterator<T> input) throws IOException { - this.input = input; - - this.noMoreBlocks = false; - this.closed = false; - - nextBlock(); - } - - - - @Override - public boolean hasNext() { - try { - if (this.nextElement == null) { - if (this.readPhase) { - // read phase, get next element from buffer - T tmp = getNextRecord(this.reuseElement); - if (tmp != null) { - this.nextElement = tmp; - return true; - } else { - return false; - } - } else { - if (this.input.hasNext()) { - final T next = this.input.next(); - if (writeNextRecord(next)) { - this.nextElement = next; - return true; - } else { - this.leftOverElement = next; - return false; - } - } else { - this.noMoreBlocks = true; - return false; - } - } - } else { - return true; - } - } catch (IOException ioex) { - throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex); - } - } - - - @Override - public T next() { - if (this.nextElement == null) { - if (!hasNext()) { - throw new NoSuchElementException(); - } - } - - T out = this.nextElement; - this.nextElement = null; - return out; - } - - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - - public void reset() { - // a reset always goes to the read phase - this.readPhase = true; - super.reset(); - } - - - @Override - public boolean nextBlock() throws IOException { - // check the state - if (this.closed) { - throw new IllegalStateException("Iterator has been closed."); - } - - // check whether more blocks are available - if (this.noMoreBlocks) { - return false; - } - - // reset the views in the superclass - super.nextBlock(); - - T next = this.leftOverElement; - this.leftOverElement = null; - if (next == null) { - if (this.input.hasNext()) { - next = this.input.next(); - } - else { - this.noMoreBlocks = true; - return false; - } - } - - // write the leftover record - if (!writeNextRecord(next)) { - throw new IOException("BlockResettableIterator could not serialize record into fresh memory block: " + - "Record is too large."); - } - - this.nextElement = next; - this.readPhase = false; - - return true; - } - - /** - * Checks, whether the input that is blocked by this iterator, has further elements - * available. This method may be used to forecast (for example at the point where a - * block is full) whether there will be more data (possibly in another block). - * - * @return True, if there will be more data, false otherwise. - */ - public boolean hasFurtherInput() { - return !this.noMoreBlocks; - } - - - public void close() { - // suggest that we are in the read phase. because nothing is in the current block, - // read requests will fail - this.readPhase = true; - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java new file mode 100644 index 0000000..9d581ce --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.resettable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.ResettableIterator; + +/** + * Implementation of an iterator that fetches a block of data into main memory and offers resettable + * access to the data in that block. + * + */ +public class NonReusingBlockResettableIterator<T> extends AbstractBlockResettableIterator<T> implements ResettableIterator<T> { + + public static final Logger LOG = LoggerFactory.getLogger(NonReusingBlockResettableIterator.class); + + // ------------------------------------------------------------------------ + + protected Iterator<T> input; + + protected T nextElement; + + protected T leftOverElement; + + protected boolean readPhase; + + protected boolean noMoreBlocks; + + // ------------------------------------------------------------------------ + + public NonReusingBlockResettableIterator(MemoryManager memoryManager, Iterator<T> input, + TypeSerializer<T> serializer, int numPages, + AbstractInvokable ownerTask) + throws MemoryAllocationException + { + this(memoryManager, serializer, numPages, ownerTask); + this.input = input; + } + + public NonReusingBlockResettableIterator(MemoryManager memoryManager, TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask) + throws MemoryAllocationException + { + super(serializer, memoryManager, numPages, ownerTask); + } + + // ------------------------------------------------------------------------ + + public void reopen(Iterator<T> input) throws IOException { + this.input = input; + + this.noMoreBlocks = false; + this.closed = false; + + nextBlock(); + } + + @Override + public boolean hasNext() { + try { + if (this.nextElement == null) { + if (this.readPhase) { + // read phase, get next element from buffer + T tmp = getNextRecord(); + if (tmp != null) { + this.nextElement = tmp; + return true; + } else { + return false; + } + } else { + if (this.input.hasNext()) { + final T next = this.input.next(); + if (writeNextRecord(next)) { + this.nextElement = next; + return true; + } else { + this.leftOverElement = next; + return false; + } + } else { + this.noMoreBlocks = true; + return false; + } + } + } else { + return true; + } + } catch (IOException ioex) { + throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex); + } + } + + + @Override + public T next() { + if (this.nextElement == null) { + if (!hasNext()) { + throw new NoSuchElementException(); + } + } + + T out = this.nextElement; + this.nextElement = null; + return out; + } + + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + + public void reset() { + // a reset always goes to the read phase + this.readPhase = true; + super.reset(); + } + + + @Override + public boolean nextBlock() throws IOException { + // check the state + if (this.closed) { + throw new IllegalStateException("Iterator has been closed."); + } + + // check whether more blocks are available + if (this.noMoreBlocks) { + return false; + } + + // reset the views in the superclass + super.nextBlock(); + + T next = this.leftOverElement; + this.leftOverElement = null; + if (next == null) { + if (this.input.hasNext()) { + next = this.input.next(); + } + else { + this.noMoreBlocks = true; + return false; + } + } + + // write the leftover record + if (!writeNextRecord(next)) { + throw new IOException("BlockResettableIterator could not serialize record into fresh memory block: " + + "Record is too large."); + } + + this.nextElement = next; + this.readPhase = false; + + return true; + } + + /** + * Checks, whether the input that is blocked by this iterator, has further elements + * available. This method may be used to forecast (for example at the point where a + * block is full) whether there will be more data (possibly in another block). + * + * @return True, if there will be more data, false otherwise. + */ + public boolean hasFurtherInput() { + return !this.noMoreBlocks; + } + + + public void close() { + // suggest that we are in the read phase. because nothing is in the current block, + // read requests will fail + this.readPhase = true; + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java new file mode 100644 index 0000000..baa0fb2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.resettable; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Implementation of an iterator that fetches a block of data into main memory and offers resettable + * access to the data in that block. + * + */ +public class ReusingBlockResettableIterator<T> extends NonReusingBlockResettableIterator<T> { + + public static final Logger LOG = LoggerFactory.getLogger(ReusingBlockResettableIterator.class); + + private final T reuseElement; + + // ------------------------------------------------------------------------ + + public ReusingBlockResettableIterator(MemoryManager memoryManager, Iterator<T> input, + TypeSerializer<T> serializer, int numPages, + AbstractInvokable ownerTask) + throws MemoryAllocationException + { + this(memoryManager, serializer, numPages, ownerTask); + this.input = input; + } + + public ReusingBlockResettableIterator(MemoryManager memoryManager, TypeSerializer<T> + serializer, int numPages, AbstractInvokable ownerTask) + throws MemoryAllocationException + { + super(memoryManager, serializer, numPages, ownerTask); + + this.reuseElement = serializer.createInstance(); + } + + // ------------------------------------------------------------------------ + + @Override + public boolean hasNext() { + try { + if (this.nextElement == null) { + if (this.readPhase) { + // read phase, get next element from buffer + T tmp = getNextRecord(this.reuseElement); + if (tmp != null) { + this.nextElement = tmp; + return true; + } else { + return false; + } + } else { + if (this.input.hasNext()) { + final T next = this.input.next(); + if (writeNextRecord(next)) { + this.nextElement = next; + return true; + } else { + this.leftOverElement = next; + return false; + } + } else { + this.noMoreBlocks = true; + return false; + } + } + } else { + return true; + } + } catch (IOException ioex) { + throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java deleted file mode 100644 index 675758a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.MemoryAllocationException; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.resettable.BlockResettableIterator; -import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator; -import org.apache.flink.runtime.operators.util.JoinTaskIterator; -import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - - -/** - * An implementation of the {@link JoinTaskIterator} that realizes the - * matching through a sort-merge join strategy. - */ -public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> { - - /** - * The log used by this iterator to log messages. - */ - private static final Logger LOG = LoggerFactory.getLogger(MergeMatchIterator.class); - - // -------------------------------------------------------------------------------------------- - - private TypePairComparator<T1, T2> comp; - - private ReusingKeyGroupedIterator<T1> iterator1; - - private ReusingKeyGroupedIterator<T2> iterator2; - - private final TypeSerializer<T1> serializer1; - - private final TypeSerializer<T2> serializer2; - - private T1 copy1; - - private T1 spillHeadCopy; - - private T2 copy2; - - private T2 blockHeadCopy; - - private final BlockResettableIterator<T2> blockIt; // for N:M cross products with same key - - private final List<MemorySegment> memoryForSpillingIterator; - - private final MemoryManager memoryManager; - - private final IOManager ioManager; - - // -------------------------------------------------------------------------------------------- - - public MergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2, - TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, - TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, TypePairComparator<T1, T2> pairComparator, - MemoryManager memoryManager, IOManager ioManager, int numMemoryPages, AbstractInvokable parentTask) - throws MemoryAllocationException - { - if (numMemoryPages < 2) { - throw new IllegalArgumentException("Merger needs at least 2 memory pages."); - } - - this.comp = pairComparator; - this.serializer1 = serializer1; - this.serializer2 = serializer2; - - this.copy1 = serializer1.createInstance(); - this.spillHeadCopy = serializer1.createInstance(); - this.copy2 = serializer2.createInstance(); - this.blockHeadCopy = serializer2.createInstance(); - - this.memoryManager = memoryManager; - this.ioManager = ioManager; - - this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate()); - this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate()); - - final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1; - this.blockIt = new BlockResettableIterator<T2>(this.memoryManager, this.serializer2, - (numMemoryPages - numPagesForSpiller), parentTask); - this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller); - } - - - @Override - public void open() throws IOException {} - - - @Override - public void close() { - if (this.blockIt != null) { - try { - this.blockIt.close(); - } - catch (Throwable t) { - LOG.error("Error closing block memory iterator: " + t.getMessage(), t); - } - } - - this.memoryManager.release(this.memoryForSpillingIterator); - } - - - @Override - public void abort() { - close(); - } - - /** - * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come - * from different inputs. The output of the <code>match()</code> method is forwarded. - * <p> - * This method first zig-zags between the two sorted inputs in order to find a common - * key, and then calls the match stub with the cross product of the values. - * - * @throws Exception Forwards all exceptions from the user code and the I/O system. - * - * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction, Collector) - */ - @Override - public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) - throws Exception - { - if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) { - // consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon) - while (this.iterator1.nextKey()); - while (this.iterator2.nextKey()); - - return false; - } - - final TypePairComparator<T1, T2> comparator = this.comp; - comparator.setReference(this.iterator1.getCurrent()); - T2 current2 = this.iterator2.getCurrent(); - - // zig zag - while (true) { - // determine the relation between the (possibly composite) keys - final int comp = comparator.compareToReference(current2); - - if (comp == 0) { - break; - } - - if (comp < 0) { - if (!this.iterator2.nextKey()) { - return false; - } - current2 = this.iterator2.getCurrent(); - } - else { - if (!this.iterator1.nextKey()) { - return false; - } - comparator.setReference(this.iterator1.getCurrent()); - } - } - - // here, we have a common key! call the match function with the cross product of the - // values - final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues(); - final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues(); - - final T1 firstV1 = values1.next(); - final T2 firstV2 = values2.next(); - - final boolean v1HasNext = values1.hasNext(); - final boolean v2HasNext = values2.hasNext(); - - // check if one side is already empty - // this check could be omitted if we put this in MatchTask. - // then we can derive the local strategy (with build side). - - if (v1HasNext) { - if (v2HasNext) { - // both sides contain more than one value - // TODO: Decide which side to spill and which to block! - crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector); - } else { - crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector); - } - } else { - if (v2HasNext) { - crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector); - } else { - // both sides contain only one value - matchFunction.join(firstV1, firstV2, collector); - } - } - return true; - } - - /** - * Crosses a single value from the first input with N values, all sharing a common key. - * Effectively realizes a <i>1:N</i> match (join). - * - * @param val1 The value form the <i>1</i> side. - * @param firstValN The first of the values from the <i>N</i> side. - * @param valsN Iterator over remaining <i>N</i> side values. - * - * @throws Exception Forwards all exceptions thrown by the stub. - */ - private void crossFirst1withNValues(final T1 val1, final T2 firstValN, - final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) - throws Exception - { - this.copy1 = this.serializer1.copy(val1, this.copy1); - matchFunction.join(this.copy1, firstValN, collector); - - // set copy and match first element - boolean more = true; - do { - final T2 nRec = valsN.next(); - - if (valsN.hasNext()) { - this.copy1 = this.serializer1.copy(val1, this.copy1); - matchFunction.join(this.copy1, nRec, collector); - } else { - matchFunction.join(val1, nRec, collector); - more = false; - } - } - while (more); - } - - /** - * Crosses a single value from the second side with N values, all sharing a common key. - * Effectively realizes a <i>N:1</i> match (join). - * - * @param val1 The value form the <i>1</i> side. - * @param firstValN The first of the values from the <i>N</i> side. - * @param valsN Iterator over remaining <i>N</i> side values. - * - * @throws Exception Forwards all exceptions thrown by the stub. - */ - private void crossSecond1withNValues(T2 val1, T1 firstValN, - Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) - throws Exception - { - this.copy2 = this.serializer2.copy(val1, this.copy2); - matchFunction.join(firstValN, this.copy2, collector); - - // set copy and match first element - boolean more = true; - do { - final T1 nRec = valsN.next(); - - if (valsN.hasNext()) { - this.copy2 = this.serializer2.copy(val1, this.copy2); - matchFunction.join(nRec,this.copy2,collector); - } else { - matchFunction.join(nRec, val1, collector); - more = false; - } - } - while (more); - } - - /** - * @param firstV1 - * @param spillVals - * @param firstV2 - * @param blockVals - */ - private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals, - final T2 firstV2, final Iterator<T2> blockVals, - final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) - throws Exception - { - // ================================================== - // We have one first (head) element from both inputs (firstV1 and firstV2) - // We have an iterator for both inputs. - // we make the V1 side the spilling side and the V2 side the blocking side. - // In order to get the full cross product without unnecessary spilling, we do the - // following: - // 1) cross the heads - // 2) cross the head of the spilling side against the first block of the blocking side - // 3) cross the iterator of the spilling side with the head of the block side - // 4) cross the iterator of the spilling side with the first block - // --------------------------------------------------- - // If the blocking side has more than one block, we really need to make the spilling side fully - // resettable. For each further block on the block side, we do: - // 5) cross the head of the spilling side with the next block - // 6) cross the spilling iterator with the next block. - - // match the first values first - this.copy1 = this.serializer1.copy(firstV1, this.copy1); - this.blockHeadCopy = this.serializer2.copy(firstV2, this.blockHeadCopy); - - // --------------- 1) Cross the heads ------------------- - matchFunction.join(this.copy1, firstV2, collector); - - // for the remaining values, we do a block-nested-loops join - SpillingResettableIterator<T1> spillIt = null; - - try { - // create block iterator on the second input - this.blockIt.reopen(blockVals); - - // ------------- 2) cross the head of the spilling side with the first block ------------------ - while (this.blockIt.hasNext()) { - final T2 nextBlockRec = this.blockIt.next(); - this.copy1 = this.serializer1.copy(firstV1, this.copy1); - matchFunction.join(this.copy1, nextBlockRec, collector); - } - this.blockIt.reset(); - - // spilling is required if the blocked input has data beyond the current block. - // in that case, create the spilling iterator - final Iterator<T1> leftSideIter; - final boolean spillingRequired = this.blockIt.hasFurtherInput(); - if (spillingRequired) - { - // more data than would fit into one block. we need to wrap the other side in a spilling iterator - // create spilling iterator on first input - spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1, - this.memoryManager, this.ioManager, this.memoryForSpillingIterator); - leftSideIter = spillIt; - spillIt.open(); - - this.spillHeadCopy = this.serializer1.copy(firstV1, this.spillHeadCopy); - } - else { - leftSideIter = spillVals; - } - - // cross the values in the v1 iterator against the current block - - while (leftSideIter.hasNext()) { - final T1 nextSpillVal = leftSideIter.next(); - this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); - - - // -------- 3) cross the iterator of the spilling side with the head of the block side -------- - this.copy2 = this.serializer2.copy(this.blockHeadCopy, this.copy2); - matchFunction.join(this.copy1, this.copy2, collector); - - // -------- 4) cross the iterator of the spilling side with the first block -------- - while (this.blockIt.hasNext()) { - T2 nextBlockRec = this.blockIt.next(); - - // get instances of key and block value - this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); - matchFunction.join(this.copy1, nextBlockRec, collector); - } - // reset block iterator - this.blockIt.reset(); - } - - // if everything from the block-side fit into a single block, we are done. - // note that in this special case, we did not create a spilling iterator at all - if (!spillingRequired) { - return; - } - - // here we are, because we have more blocks on the block side - // loop as long as there are blocks from the blocked input - while (this.blockIt.nextBlock()) - { - // rewind the spilling iterator - spillIt.reset(); - - // ------------- 5) cross the head of the spilling side with the next block ------------ - while (this.blockIt.hasNext()) { - this.copy1 = this.serializer1.copy(this.spillHeadCopy, this.copy1); - final T2 nextBlockVal = blockIt.next(); - matchFunction.join(this.copy1, nextBlockVal, collector); - } - this.blockIt.reset(); - - // -------- 6) cross the spilling iterator with the next block. ------------------ - while (spillIt.hasNext()) - { - // get value from resettable iterator - final T1 nextSpillVal = spillIt.next(); - // cross value with block values - while (this.blockIt.hasNext()) { - // get instances of key and block value - final T2 nextBlockVal = this.blockIt.next(); - this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); - matchFunction.join(this.copy1, nextBlockVal, collector); - } - - // reset block iterator - this.blockIt.reset(); - } - // reset v1 iterator - spillIt.reset(); - } - } - finally { - if (spillIt != null) { - this.memoryForSpillingIterator.addAll(spillIt.close()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java new file mode 100644 index 0000000..70b6f9a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator; +import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator; +import org.apache.flink.runtime.operators.util.JoinTaskIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * matching through a sort-merge join strategy. + */ +public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> { + + /** + * The log used by this iterator to log messages. + */ + private static final Logger LOG = LoggerFactory.getLogger(NonReusingMergeMatchIterator.class); + + // -------------------------------------------------------------------------------------------- + + private TypePairComparator<T1, T2> comp; + + private NonReusingKeyGroupedIterator<T1> iterator1; + + private NonReusingKeyGroupedIterator<T2> iterator2; + + private final TypeSerializer<T1> serializer1; + + private final TypeSerializer<T2> serializer2; + + private final NonReusingBlockResettableIterator<T2> blockIt; // for N:M cross products with same key + + private final List<MemorySegment> memoryForSpillingIterator; + + private final MemoryManager memoryManager; + + private final IOManager ioManager; + + // -------------------------------------------------------------------------------------------- + + public NonReusingMergeMatchIterator( + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException + { + if (numMemoryPages < 2) { + throw new IllegalArgumentException("Merger needs at least 2 memory pages."); + } + + this.comp = pairComparator; + this.serializer1 = serializer1; + this.serializer2 = serializer2; + + this.memoryManager = memoryManager; + this.ioManager = ioManager; + + this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate()); + this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate()); + + final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1; + this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2, + (numMemoryPages - numPagesForSpiller), parentTask); + this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller); + } + + + @Override + public void open() throws IOException {} + + + @Override + public void close() { + if (this.blockIt != null) { + try { + this.blockIt.close(); + } + catch (Throwable t) { + LOG.error("Error closing block memory iterator: " + t.getMessage(), t); + } + } + + this.memoryManager.release(this.memoryForSpillingIterator); + } + + + @Override + public void abort() { + close(); + } + + /** + * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come + * from different inputs. The output of the <code>match()</code> method is forwarded. + * <p> + * This method first zig-zags between the two sorted inputs in order to find a common + * key, and then calls the match stub with the cross product of the values. + * + * @throws Exception Forwards all exceptions from the user code and the I/O system. + * + * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) + */ + @Override + public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + throws Exception + { + if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) { + // consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon) + while (this.iterator1.nextKey()); + while (this.iterator2.nextKey()); + + return false; + } + + final TypePairComparator<T1, T2> comparator = this.comp; + comparator.setReference(this.iterator1.getCurrent()); + T2 current2 = this.iterator2.getCurrent(); + + // zig zag + while (true) { + // determine the relation between the (possibly composite) keys + final int comp = comparator.compareToReference(current2); + + if (comp == 0) { + break; + } + + if (comp < 0) { + if (!this.iterator2.nextKey()) { + return false; + } + current2 = this.iterator2.getCurrent(); + } + else { + if (!this.iterator1.nextKey()) { + return false; + } + comparator.setReference(this.iterator1.getCurrent()); + } + } + + // here, we have a common key! call the match function with the cross product of the + // values + final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues(); + final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues(); + + final T1 firstV1 = values1.next(); + final T2 firstV2 = values2.next(); + + final boolean v1HasNext = values1.hasNext(); + final boolean v2HasNext = values2.hasNext(); + + // check if one side is already empty + // this check could be omitted if we put this in MatchTask. + // then we can derive the local strategy (with build side). + + if (v1HasNext) { + if (v2HasNext) { + // both sides contain more than one value + // TODO: Decide which side to spill and which to block! + crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector); + } else { + crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector); + } + } else { + if (v2HasNext) { + crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector); + } else { + // both sides contain only one value + matchFunction.join(firstV1, firstV2, collector); + } + } + return true; + } + + /** + * Crosses a single value from the first input with N values, all sharing a common key. + * Effectively realizes a <i>1:N</i> match (join). + * + * @param val1 The value form the <i>1</i> side. + * @param firstValN The first of the values from the <i>N</i> side. + * @param valsN Iterator over remaining <i>N</i> side values. + * + * @throws Exception Forwards all exceptions thrown by the stub. + */ + private void crossFirst1withNValues(final T1 val1, final T2 firstValN, + final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + throws Exception + { + T1 copy1 = this.serializer1.copy(val1); + matchFunction.join(copy1, firstValN, collector); + + // set copy and match first element + boolean more = true; + do { + final T2 nRec = valsN.next(); + + if (valsN.hasNext()) { + copy1 = this.serializer1.copy(val1); + matchFunction.join(copy1, nRec, collector); + } else { + matchFunction.join(val1, nRec, collector); + more = false; + } + } + while (more); + } + + /** + * Crosses a single value from the second side with N values, all sharing a common key. + * Effectively realizes a <i>N:1</i> match (join). + * + * @param val1 The value form the <i>1</i> side. + * @param firstValN The first of the values from the <i>N</i> side. + * @param valsN Iterator over remaining <i>N</i> side values. + * + * @throws Exception Forwards all exceptions thrown by the stub. + */ + private void crossSecond1withNValues(T2 val1, T1 firstValN, + Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) + throws Exception + { + T2 copy2 = this.serializer2.copy(val1); + matchFunction.join(firstValN, copy2, collector); + + // set copy and match first element + boolean more = true; + do { + final T1 nRec = valsN.next(); + + if (valsN.hasNext()) { + copy2 = this.serializer2.copy(val1); + matchFunction.join(nRec, copy2, collector); + } else { + matchFunction.join(nRec, val1, collector); + more = false; + } + } + while (more); + } + + /** + * @param firstV1 + * @param spillVals + * @param firstV2 + * @param blockVals + */ + private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals, + final T2 firstV2, final Iterator<T2> blockVals, + final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + throws Exception + { + // ================================================== + // We have one first (head) element from both inputs (firstV1 and firstV2) + // We have an iterator for both inputs. + // we make the V1 side the spilling side and the V2 side the blocking side. + // In order to get the full cross product without unnecessary spilling, we do the + // following: + // 1) cross the heads + // 2) cross the head of the spilling side against the first block of the blocking side + // 3) cross the iterator of the spilling side with the head of the block side + // 4) cross the iterator of the spilling side with the first block + // --------------------------------------------------- + // If the blocking side has more than one block, we really need to make the spilling side fully + // resettable. For each further block on the block side, we do: + // 5) cross the head of the spilling side with the next block + // 6) cross the spilling iterator with the next block. + + // match the first values first + T1 copy1 = this.serializer1.copy(firstV1); + T2 blockHeadCopy = this.serializer2.copy(firstV2); + T1 spillHeadCopy = null; + + + // --------------- 1) Cross the heads ------------------- + matchFunction.join(copy1, firstV2, collector); + + // for the remaining values, we do a block-nested-loops join + SpillingResettableIterator<T1> spillIt = null; + + try { + // create block iterator on the second input + this.blockIt.reopen(blockVals); + + // ------------- 2) cross the head of the spilling side with the first block ------------------ + while (this.blockIt.hasNext()) { + final T2 nextBlockRec = this.blockIt.next(); + copy1 = this.serializer1.copy(firstV1); + matchFunction.join(copy1, nextBlockRec, collector); + } + this.blockIt.reset(); + + // spilling is required if the blocked input has data beyond the current block. + // in that case, create the spilling iterator + final Iterator<T1> leftSideIter; + final boolean spillingRequired = this.blockIt.hasFurtherInput(); + if (spillingRequired) + { + // more data than would fit into one block. we need to wrap the other side in a spilling iterator + // create spilling iterator on first input + spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1, + this.memoryManager, this.ioManager, this.memoryForSpillingIterator); + leftSideIter = spillIt; + spillIt.open(); + + spillHeadCopy = this.serializer1.copy(firstV1); + } + else { + leftSideIter = spillVals; + } + + // cross the values in the v1 iterator against the current block + + while (leftSideIter.hasNext()) { + final T1 nextSpillVal = leftSideIter.next(); + copy1 = this.serializer1.copy(nextSpillVal); + + + // -------- 3) cross the iterator of the spilling side with the head of the block side -------- + T2 copy2 = this.serializer2.copy(blockHeadCopy); + matchFunction.join(copy1, copy2, collector); + + // -------- 4) cross the iterator of the spilling side with the first block -------- + while (this.blockIt.hasNext()) { + T2 nextBlockRec = this.blockIt.next(); + + // get instances of key and block value + copy1 = this.serializer1.copy(nextSpillVal); + matchFunction.join(copy1, nextBlockRec, collector); + } + // reset block iterator + this.blockIt.reset(); + } + + // if everything from the block-side fit into a single block, we are done. + // note that in this special case, we did not create a spilling iterator at all + if (!spillingRequired) { + return; + } + + // here we are, because we have more blocks on the block side + // loop as long as there are blocks from the blocked input + while (this.blockIt.nextBlock()) + { + // rewind the spilling iterator + spillIt.reset(); + + // ------------- 5) cross the head of the spilling side with the next block ------------ + while (this.blockIt.hasNext()) { + copy1 = this.serializer1.copy(spillHeadCopy); + final T2 nextBlockVal = blockIt.next(); + matchFunction.join(copy1, nextBlockVal, collector); + } + this.blockIt.reset(); + + // -------- 6) cross the spilling iterator with the next block. ------------------ + while (spillIt.hasNext()) + { + // get value from resettable iterator + final T1 nextSpillVal = spillIt.next(); + // cross value with block values + while (this.blockIt.hasNext()) { + // get instances of key and block value + final T2 nextBlockVal = this.blockIt.next(); + copy1 = this.serializer1.copy(nextSpillVal); + matchFunction.join(copy1, nextBlockVal, collector); + } + + // reset block iterator + this.blockIt.reset(); + } + // reset v1 iterator + spillIt.reset(); + } + } + finally { + if (spillIt != null) { + this.memoryForSpillingIterator.addAll(spillIt.close()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java new file mode 100644 index 0000000..66beee1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator; +import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator; +import org.apache.flink.runtime.operators.util.JoinTaskIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + + +/** + * An implementation of the {@link JoinTaskIterator} that realizes the + * matching through a sort-merge join strategy. + */ +public class ReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> { + + /** + * The log used by this iterator to log messages. + */ + private static final Logger LOG = LoggerFactory.getLogger(ReusingMergeMatchIterator.class); + + // -------------------------------------------------------------------------------------------- + + private TypePairComparator<T1, T2> comp; + + private ReusingKeyGroupedIterator<T1> iterator1; + + private ReusingKeyGroupedIterator<T2> iterator2; + + private final TypeSerializer<T1> serializer1; + + private final TypeSerializer<T2> serializer2; + + private T1 copy1; + + private T1 spillHeadCopy; + + private T2 copy2; + + private T2 blockHeadCopy; + + private final NonReusingBlockResettableIterator<T2> blockIt; // for N:M cross products with same key + + private final List<MemorySegment> memoryForSpillingIterator; + + private final MemoryManager memoryManager; + + private final IOManager ioManager; + + // -------------------------------------------------------------------------------------------- + + public ReusingMergeMatchIterator( + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException + { + if (numMemoryPages < 2) { + throw new IllegalArgumentException("Merger needs at least 2 memory pages."); + } + + this.comp = pairComparator; + this.serializer1 = serializer1; + this.serializer2 = serializer2; + + this.copy1 = serializer1.createInstance(); + this.spillHeadCopy = serializer1.createInstance(); + this.copy2 = serializer2.createInstance(); + this.blockHeadCopy = serializer2.createInstance(); + + this.memoryManager = memoryManager; + this.ioManager = ioManager; + + this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate()); + this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate()); + + final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1; + this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2, + (numMemoryPages - numPagesForSpiller), parentTask); + this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller); + } + + + @Override + public void open() throws IOException {} + + + @Override + public void close() { + if (this.blockIt != null) { + try { + this.blockIt.close(); + } + catch (Throwable t) { + LOG.error("Error closing block memory iterator: " + t.getMessage(), t); + } + } + + this.memoryManager.release(this.memoryForSpillingIterator); + } + + + @Override + public void abort() { + close(); + } + + /** + * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come + * from different inputs. The output of the <code>match()</code> method is forwarded. + * <p> + * This method first zig-zags between the two sorted inputs in order to find a common + * key, and then calls the match stub with the cross product of the values. + * + * @throws Exception Forwards all exceptions from the user code and the I/O system. + * + * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction, Collector) + */ + @Override + public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + throws Exception + { + if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) { + // consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon) + while (this.iterator1.nextKey()); + while (this.iterator2.nextKey()); + + return false; + } + + final TypePairComparator<T1, T2> comparator = this.comp; + comparator.setReference(this.iterator1.getCurrent()); + T2 current2 = this.iterator2.getCurrent(); + + // zig zag + while (true) { + // determine the relation between the (possibly composite) keys + final int comp = comparator.compareToReference(current2); + + if (comp == 0) { + break; + } + + if (comp < 0) { + if (!this.iterator2.nextKey()) { + return false; + } + current2 = this.iterator2.getCurrent(); + } + else { + if (!this.iterator1.nextKey()) { + return false; + } + comparator.setReference(this.iterator1.getCurrent()); + } + } + + // here, we have a common key! call the match function with the cross product of the + // values + final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues(); + final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues(); + + final T1 firstV1 = values1.next(); + final T2 firstV2 = values2.next(); + + final boolean v1HasNext = values1.hasNext(); + final boolean v2HasNext = values2.hasNext(); + + // check if one side is already empty + // this check could be omitted if we put this in MatchTask. + // then we can derive the local strategy (with build side). + + if (v1HasNext) { + if (v2HasNext) { + // both sides contain more than one value + // TODO: Decide which side to spill and which to block! + crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector); + } else { + crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector); + } + } else { + if (v2HasNext) { + crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector); + } else { + // both sides contain only one value + matchFunction.join(firstV1, firstV2, collector); + } + } + return true; + } + + /** + * Crosses a single value from the first input with N values, all sharing a common key. + * Effectively realizes a <i>1:N</i> match (join). + * + * @param val1 The value form the <i>1</i> side. + * @param firstValN The first of the values from the <i>N</i> side. + * @param valsN Iterator over remaining <i>N</i> side values. + * + * @throws Exception Forwards all exceptions thrown by the stub. + */ + private void crossFirst1withNValues(final T1 val1, final T2 firstValN, + final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + throws Exception + { + this.copy1 = this.serializer1.copy(val1, this.copy1); + matchFunction.join(this.copy1, firstValN, collector); + + // set copy and match first element + boolean more = true; + do { + final T2 nRec = valsN.next(); + + if (valsN.hasNext()) { + this.copy1 = this.serializer1.copy(val1, this.copy1); + matchFunction.join(this.copy1, nRec, collector); + } else { + matchFunction.join(val1, nRec, collector); + more = false; + } + } + while (more); + } + + /** + * Crosses a single value from the second side with N values, all sharing a common key. + * Effectively realizes a <i>N:1</i> match (join). + * + * @param val1 The value form the <i>1</i> side. + * @param firstValN The first of the values from the <i>N</i> side. + * @param valsN Iterator over remaining <i>N</i> side values. + * + * @throws Exception Forwards all exceptions thrown by the stub. + */ + private void crossSecond1withNValues(T2 val1, T1 firstValN, + Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) + throws Exception + { + this.copy2 = this.serializer2.copy(val1, this.copy2); + matchFunction.join(firstValN, this.copy2, collector); + + // set copy and match first element + boolean more = true; + do { + final T1 nRec = valsN.next(); + + if (valsN.hasNext()) { + this.copy2 = this.serializer2.copy(val1, this.copy2); + matchFunction.join(nRec,this.copy2,collector); + } else { + matchFunction.join(nRec, val1, collector); + more = false; + } + } + while (more); + } + + /** + * @param firstV1 + * @param spillVals + * @param firstV2 + * @param blockVals + */ + private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals, + final T2 firstV2, final Iterator<T2> blockVals, + final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + throws Exception + { + // ================================================== + // We have one first (head) element from both inputs (firstV1 and firstV2) + // We have an iterator for both inputs. + // we make the V1 side the spilling side and the V2 side the blocking side. + // In order to get the full cross product without unnecessary spilling, we do the + // following: + // 1) cross the heads + // 2) cross the head of the spilling side against the first block of the blocking side + // 3) cross the iterator of the spilling side with the head of the block side + // 4) cross the iterator of the spilling side with the first block + // --------------------------------------------------- + // If the blocking side has more than one block, we really need to make the spilling side fully + // resettable. For each further block on the block side, we do: + // 5) cross the head of the spilling side with the next block + // 6) cross the spilling iterator with the next block. + + // match the first values first + this.copy1 = this.serializer1.copy(firstV1, this.copy1); + this.blockHeadCopy = this.serializer2.copy(firstV2, this.blockHeadCopy); + + // --------------- 1) Cross the heads ------------------- + matchFunction.join(this.copy1, firstV2, collector); + + // for the remaining values, we do a block-nested-loops join + SpillingResettableIterator<T1> spillIt = null; + + try { + // create block iterator on the second input + this.blockIt.reopen(blockVals); + + // ------------- 2) cross the head of the spilling side with the first block ------------------ + while (this.blockIt.hasNext()) { + final T2 nextBlockRec = this.blockIt.next(); + this.copy1 = this.serializer1.copy(firstV1, this.copy1); + matchFunction.join(this.copy1, nextBlockRec, collector); + } + this.blockIt.reset(); + + // spilling is required if the blocked input has data beyond the current block. + // in that case, create the spilling iterator + final Iterator<T1> leftSideIter; + final boolean spillingRequired = this.blockIt.hasFurtherInput(); + if (spillingRequired) + { + // more data than would fit into one block. we need to wrap the other side in a spilling iterator + // create spilling iterator on first input + spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1, + this.memoryManager, this.ioManager, this.memoryForSpillingIterator); + leftSideIter = spillIt; + spillIt.open(); + + this.spillHeadCopy = this.serializer1.copy(firstV1, this.spillHeadCopy); + } + else { + leftSideIter = spillVals; + } + + // cross the values in the v1 iterator against the current block + + while (leftSideIter.hasNext()) { + final T1 nextSpillVal = leftSideIter.next(); + this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); + + + // -------- 3) cross the iterator of the spilling side with the head of the block side -------- + this.copy2 = this.serializer2.copy(this.blockHeadCopy, this.copy2); + matchFunction.join(this.copy1, this.copy2, collector); + + // -------- 4) cross the iterator of the spilling side with the first block -------- + while (this.blockIt.hasNext()) { + T2 nextBlockRec = this.blockIt.next(); + + // get instances of key and block value + this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); + matchFunction.join(this.copy1, nextBlockRec, collector); + } + // reset block iterator + this.blockIt.reset(); + } + + // if everything from the block-side fit into a single block, we are done. + // note that in this special case, we did not create a spilling iterator at all + if (!spillingRequired) { + return; + } + + // here we are, because we have more blocks on the block side + // loop as long as there are blocks from the blocked input + while (this.blockIt.nextBlock()) + { + // rewind the spilling iterator + spillIt.reset(); + + // ------------- 5) cross the head of the spilling side with the next block ------------ + while (this.blockIt.hasNext()) { + this.copy1 = this.serializer1.copy(this.spillHeadCopy, this.copy1); + final T2 nextBlockVal = blockIt.next(); + matchFunction.join(this.copy1, nextBlockVal, collector); + } + this.blockIt.reset(); + + // -------- 6) cross the spilling iterator with the next block. ------------------ + while (spillIt.hasNext()) + { + // get value from resettable iterator + final T1 nextSpillVal = spillIt.next(); + // cross value with block values + while (this.blockIt.hasNext()) { + // get instances of key and block value + final T2 nextBlockVal = this.blockIt.next(); + this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); + matchFunction.join(this.copy1, nextBlockVal, collector); + } + + // reset block iterator + this.blockIt.reset(); + } + // reset v1 iterator + spillIt.reset(); + } + } + finally { + if (spillIt != null) { + this.memoryForSpillingIterator.addAll(spillIt.close()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java deleted file mode 100644 index c51e53a..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators.resettable; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.junit.Assert; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.record.RecordSerializer; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.resettable.BlockResettableIterator; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - - -public class BlockResettableIteratorTest -{ - private static final int MEMORY_CAPACITY = 3 * 128 * 1024; - - private static final int NUM_VALUES = 20000; - - private MemoryManager memman; - - private Iterator<Record> reader; - - private List<Record> objects; - - private final TypeSerializer<Record> serializer = RecordSerializer.get(); - - @Before - public void startup() { - // set up IO and memory manager - this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1); - - // create test objects - this.objects = new ArrayList<Record>(20000); - for (int i = 0; i < NUM_VALUES; ++i) { - this.objects.add(new Record(new IntValue(i))); - } - - // create the reader - this.reader = objects.iterator(); - } - - @After - public void shutdown() { - this.objects = null; - - // check that the memory manager got all segments back - if (!this.memman.verifyEmpty()) { - Assert.fail("A memory leak has occurred: Not all memory was properly returned to the memory manager."); - } - - this.memman.shutdown(); - this.memman = null; - } - - @Test - public void testSerialBlockResettableIterator() throws Exception - { - final AbstractInvokable memOwner = new DummyInvokable(); - // create the resettable Iterator - final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>( - this.memman, this.reader, this.serializer, 1, memOwner); - // open the iterator - iterator.open(); - - // now test walking through the iterator - int lower = 0; - int upper = 0; - do { - lower = upper; - upper = lower; - // find the upper bound - while (iterator.hasNext()) { - Record target = iterator.next(); - int val = target.getField(0, IntValue.class).getValue(); - Assert.assertEquals(upper++, val); - } - // now reset the buffer a few times - for (int i = 0; i < 5; ++i) { - iterator.reset(); - int count = 0; - while (iterator.hasNext()) { - Record target = iterator.next(); - int val = target.getField(0, IntValue.class).getValue(); - Assert.assertEquals(lower + (count++), val); - } - Assert.assertEquals(upper - lower, count); - } - } while (iterator.nextBlock()); - Assert.assertEquals(NUM_VALUES, upper); - // close the iterator - iterator.close(); - } - - @Test - public void testDoubleBufferedBlockResettableIterator() throws Exception - { - final AbstractInvokable memOwner = new DummyInvokable(); - // create the resettable Iterator - final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>( - this.memman, this.reader, this.serializer, 2, memOwner); - // open the iterator - iterator.open(); - - // now test walking through the iterator - int lower = 0; - int upper = 0; - do { - lower = upper; - upper = lower; - // find the upper bound - while (iterator.hasNext()) { - Record target = iterator.next(); - int val = target.getField(0, IntValue.class).getValue(); - Assert.assertEquals(upper++, val); - } - // now reset the buffer a few times - for (int i = 0; i < 5; ++i) { - iterator.reset(); - int count = 0; - while (iterator.hasNext()) { - Record target = iterator.next(); - int val = target.getField(0, IntValue.class).getValue(); - Assert.assertEquals(lower + (count++), val); - } - Assert.assertEquals(upper - lower, count); - } - } while (iterator.nextBlock()); - Assert.assertEquals(NUM_VALUES, upper); - - // close the iterator - iterator.close(); - } - - @Test - public void testTwelveFoldBufferedBlockResettableIterator() throws Exception - { - final AbstractInvokable memOwner = new DummyInvokable(); - // create the resettable Iterator - final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>( - this.memman, this.reader, this.serializer, 12, memOwner); - // open the iterator - iterator.open(); - - // now test walking through the iterator - int lower = 0; - int upper = 0; - do { - lower = upper; - upper = lower; - // find the upper bound - while (iterator.hasNext()) { - Record target = iterator.next(); - int val = target.getField(0, IntValue.class).getValue(); - Assert.assertEquals(upper++, val); - } - // now reset the buffer a few times - for (int i = 0; i < 5; ++i) { - iterator.reset(); - int count = 0; - while (iterator.hasNext()) { - Record target = iterator.next(); - int val = target.getField(0, IntValue.class).getValue(); - Assert.assertEquals(lower + (count++), val); - } - Assert.assertEquals(upper - lower, count); - } - } while (iterator.nextBlock()); - Assert.assertEquals(NUM_VALUES, upper); - - // close the iterator - iterator.close(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java new file mode 100644 index 0000000..5641f29 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.resettable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.Record; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class NonReusingBlockResettableIteratorTest +{ + private static final int MEMORY_CAPACITY = 3 * 128 * 1024; + + private static final int NUM_VALUES = 20000; + + private MemoryManager memman; + + private Iterator<Record> reader; + + private List<Record> objects; + + private final TypeSerializer<Record> serializer = RecordSerializer.get(); + + @Before + public void startup() { + // set up IO and memory manager + this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1); + + // create test objects + this.objects = new ArrayList<Record>(20000); + for (int i = 0; i < NUM_VALUES; ++i) { + this.objects.add(new Record(new IntValue(i))); + } + + // create the reader + this.reader = objects.iterator(); + } + + @After + public void shutdown() { + this.objects = null; + + // check that the memory manager got all segments back + if (!this.memman.verifyEmpty()) { + Assert.fail("A memory leak has occurred: Not all memory was properly returned to the memory manager."); + } + + this.memman.shutdown(); + this.memman = null; + } + + @Test + public void testSerialBlockResettableIterator() throws Exception + { + final AbstractInvokable memOwner = new DummyInvokable(); + // create the resettable Iterator + final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>( + this.memman, this.reader, this.serializer, 1, memOwner); + // open the iterator + iterator.open(); + + // now test walking through the iterator + int lower = 0; + int upper = 0; + do { + lower = upper; + upper = lower; + // find the upper bound + while (iterator.hasNext()) { + Record target = iterator.next(); + int val = target.getField(0, IntValue.class).getValue(); + Assert.assertEquals(upper++, val); + } + // now reset the buffer a few times + for (int i = 0; i < 5; ++i) { + iterator.reset(); + int count = 0; + while (iterator.hasNext()) { + Record target = iterator.next(); + int val = target.getField(0, IntValue.class).getValue(); + Assert.assertEquals(lower + (count++), val); + } + Assert.assertEquals(upper - lower, count); + } + } while (iterator.nextBlock()); + Assert.assertEquals(NUM_VALUES, upper); + // close the iterator + iterator.close(); + } + + @Test + public void testDoubleBufferedBlockResettableIterator() throws Exception + { + final AbstractInvokable memOwner = new DummyInvokable(); + // create the resettable Iterator + final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>( + this.memman, this.reader, this.serializer, 2, memOwner); + // open the iterator + iterator.open(); + + // now test walking through the iterator + int lower = 0; + int upper = 0; + do { + lower = upper; + upper = lower; + // find the upper bound + while (iterator.hasNext()) { + Record target = iterator.next(); + int val = target.getField(0, IntValue.class).getValue(); + Assert.assertEquals(upper++, val); + } + // now reset the buffer a few times + for (int i = 0; i < 5; ++i) { + iterator.reset(); + int count = 0; + while (iterator.hasNext()) { + Record target = iterator.next(); + int val = target.getField(0, IntValue.class).getValue(); + Assert.assertEquals(lower + (count++), val); + } + Assert.assertEquals(upper - lower, count); + } + } while (iterator.nextBlock()); + Assert.assertEquals(NUM_VALUES, upper); + + // close the iterator + iterator.close(); + } + + @Test + public void testTwelveFoldBufferedBlockResettableIterator() throws Exception + { + final AbstractInvokable memOwner = new DummyInvokable(); + // create the resettable Iterator + final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>( + this.memman, this.reader, this.serializer, 12, memOwner); + // open the iterator + iterator.open(); + + // now test walking through the iterator + int lower = 0; + int upper = 0; + do { + lower = upper; + upper = lower; + // find the upper bound + while (iterator.hasNext()) { + Record target = iterator.next(); + int val = target.getField(0, IntValue.class).getValue(); + Assert.assertEquals(upper++, val); + } + // now reset the buffer a few times + for (int i = 0; i < 5; ++i) { + iterator.reset(); + int count = 0; + while (iterator.hasNext()) { + Record target = iterator.next(); + int val = target.getField(0, IntValue.class).getValue(); + Assert.assertEquals(lower + (count++), val); + } + Assert.assertEquals(upper - lower, count); + } + } while (iterator.nextBlock()); + Assert.assertEquals(NUM_VALUES, upper); + + // close the iterator + iterator.close(); + } + +}
