Repository: flink
Updated Branches:
  refs/heads/master 30761572b -> 941ac6dfd


[FLINK-2105] Extract abstract superclass, interface from MergeMatchIterators, 
KeyGroupedIterators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc6849a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc6849a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc6849a

Branch: refs/heads/master
Commit: 0dc6849a594b61a6cad8ee582ca1758f0349a72b
Parents: 3076157
Author: Johann Kovacs <m...@jkovacs.de>
Authored: Fri Jul 10 17:21:58 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Aug 4 21:35:26 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/AbstractMergeIterator.java   | 356 +++++++++++++++++
 .../sort/AbstractMergeMatchIterator.java        | 107 +++++
 .../sort/NonReusingMergeMatchIterator.java      | 382 +-----------------
 .../sort/ReusingMergeMatchIterator.java         | 389 +------------------
 .../flink/runtime/util/KeyGroupedIterator.java  |  31 ++
 .../util/NonReusingKeyGroupedIterator.java      |   2 +-
 .../runtime/util/ReusingKeyGroupedIterator.java |   5 +-
 7 files changed, 517 insertions(+), 755 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
new file mode 100644
index 0000000..9a61c14
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
+import 
org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractMergeIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O> {
+
+       private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+       protected TypePairComparator<T1, T2> pairComparator;
+
+       protected KeyGroupedIterator<T1> iterator1;
+       protected KeyGroupedIterator<T2> iterator2;
+
+       protected final TypeSerializer<T1> serializer1;
+       protected final TypeSerializer<T2> serializer2;
+
+       private final NonReusingBlockResettableIterator<T2> blockIt;    // for 
N:M cross products with same key
+
+       private final IOManager ioManager;
+       private final MemoryManager memoryManager;
+       private final List<MemorySegment> memoryForSpillingIterator;
+
+       // instances for object reuse
+       protected T1 copy1;
+       protected T1 spillHeadCopy;
+       protected T2 copy2;
+       protected T2 blockHeadCopy;
+
+       public AbstractMergeIterator(MutableObjectIterator<T1> input1, 
MutableObjectIterator<T2> input2,
+                                                               
TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+                                                               
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+                                                               
TypePairComparator<T1, T2> pairComparator,
+                                                               MemoryManager 
memoryManager,
+                                                               IOManager 
ioManager,
+                                                               int 
numMemoryPages,
+                                                               
AbstractInvokable parentTask) throws MemoryAllocationException {
+               if (numMemoryPages < 2) {
+                       throw new IllegalArgumentException("Merger needs at 
least 2 memory pages.");
+               }
+
+               this.pairComparator = pairComparator;
+               this.serializer1 = serializer1;
+               this.serializer2 = serializer2;
+
+               this.memoryManager = memoryManager;
+               this.ioManager = ioManager;
+
+               this.iterator1 = createKeyGroupedIterator(input1, serializer1, 
comparator1.duplicate());
+               this.iterator2 = createKeyGroupedIterator(input2, serializer2, 
comparator2.duplicate());
+
+               final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
+               this.blockIt = new 
NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
+                               (numMemoryPages - numPagesForSpiller), 
parentTask);
+               this.memoryForSpillingIterator = 
memoryManager.allocatePages(parentTask, numPagesForSpiller);
+       }
+
+       @Override
+       public void open() throws IOException {
+       }
+
+       @Override
+       public void close() {
+               if (this.blockIt != null) {
+                       try {
+                               this.blockIt.close();
+                       } catch (Throwable t) {
+                               LOG.error("Error closing block memory iterator: 
" + t.getMessage(), t);
+                       }
+               }
+
+               this.memoryManager.release(this.memoryForSpillingIterator);
+       }
+
+       @Override
+       public void abort() {
+               close();
+       }
+
+       /**
+        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come
+        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
+        * <p>
+        * This method first zig-zags between the two sorted inputs in order to 
find a common
+        * key, and then calls the match stub with the cross product of the 
values.
+        *
+        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+        */
+       @Override
+       public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, 
O> matchFunction, final Collector<O> collector)
+                       throws Exception;
+
+       protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> 
values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) 
throws Exception {
+               final T1 firstV1 = values1.next();
+               final T2 firstV2 = values2.next();
+
+               final boolean v1HasNext = values1.hasNext();
+               final boolean v2HasNext = values2.hasNext();
+
+               // check if one side is already empty
+               // this check could be omitted if we put this in MatchTask.
+               // then we can derive the local strategy (with build side).
+
+               if (v1HasNext) {
+                       if (v2HasNext) {
+                               // both sides contain more than one value
+                               // TODO: Decide which side to spill and which 
to block!
+                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
+                       } else {
+                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
+                       }
+               } else {
+                       if (v2HasNext) {
+                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
+                       } else {
+                               // both sides contain only one value
+                               matchFunction.join(firstV1, firstV2, collector);
+                       }
+               }
+       }
+
+       /**
+        * Crosses a single value from the first input with N values, all 
sharing a common key.
+        * Effectively realizes a <i>1:N</i> match (join).
+        *
+        * @param val1      The value form the <i>1</i> side.
+        * @param firstValN The first of the values from the <i>N</i> side.
+        * @param valsN     Iterator over remaining <i>N</i> side values.
+        * @throws Exception Forwards all exceptions thrown by the stub.
+        */
+       private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
+                                                                               
final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, 
final Collector<O> collector)
+                       throws Exception {
+               T1 copy1 = createCopy(serializer1, val1, this.copy1);
+               matchFunction.join(copy1, firstValN, collector);
+
+               // set copy and match first element
+               boolean more = true;
+               do {
+                       final T2 nRec = valsN.next();
+
+                       if (valsN.hasNext()) {
+                               copy1 = createCopy(serializer1, val1, 
this.copy1);
+                               matchFunction.join(copy1, nRec, collector);
+                       } else {
+                               matchFunction.join(val1, nRec, collector);
+                               more = false;
+                       }
+               }
+               while (more);
+       }
+
+       /**
+        * Crosses a single value from the second side with N values, all 
sharing a common key.
+        * Effectively realizes a <i>N:1</i> match (join).
+        *
+        * @param val1      The value form the <i>1</i> side.
+        * @param firstValN The first of the values from the <i>N</i> side.
+        * @param valsN     Iterator over remaining <i>N</i> side values.
+        * @throws Exception Forwards all exceptions thrown by the stub.
+        */
+       private void crossSecond1withNValues(T2 val1, T1 firstValN,
+                                                                               
Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> 
collector) throws Exception {
+               T2 copy2 = createCopy(serializer2, val1, this.copy2);
+               matchFunction.join(firstValN, copy2, collector);
+
+               // set copy and match first element
+               boolean more = true;
+               do {
+                       final T1 nRec = valsN.next();
+
+                       if (valsN.hasNext()) {
+                               copy2 = createCopy(serializer2, val1, 
this.copy2);
+                               matchFunction.join(nRec, copy2, collector);
+                       } else {
+                               matchFunction.join(nRec, val1, collector);
+                               more = false;
+                       }
+               }
+               while (more);
+       }
+
+       private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
+                                                                       final 
T2 firstV2, final Iterator<T2> blockVals,
+                                                                       final 
FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws 
Exception {
+               // ==================================================
+               // We have one first (head) element from both inputs (firstV1 
and firstV2)
+               // We have an iterator for both inputs.
+               // we make the V1 side the spilling side and the V2 side the 
blocking side.
+               // In order to get the full cross product without unnecessary 
spilling, we do the
+               // following:
+               // 1) cross the heads
+               // 2) cross the head of the spilling side against the first 
block of the blocking side
+               // 3) cross the iterator of the spilling side with the head of 
the block side
+               // 4) cross the iterator of the spilling side with the first 
block
+               // ---------------------------------------------------
+               // If the blocking side has more than one block, we really need 
to make the spilling side fully
+               // resettable. For each further block on the block side, we do:
+               // 5) cross the head of the spilling side with the next block
+               // 6) cross the spilling iterator with the next block.
+
+               // match the first values first
+               T1 copy1 = this.createCopy(serializer1, firstV1, this.copy1);
+               T2 blockHeadCopy = this.createCopy(serializer2, firstV2, 
this.blockHeadCopy);
+               T1 spillHeadCopy = null;
+
+               // --------------- 1) Cross the heads -------------------
+               matchFunction.join(copy1, firstV2, collector);
+
+               // for the remaining values, we do a block-nested-loops join
+               SpillingResettableIterator<T1> spillIt = null;
+
+               try {
+                       // create block iterator on the second input
+                       this.blockIt.reopen(blockVals);
+
+                       // ------------- 2) cross the head of the spilling side 
with the first block ------------------
+                       while (this.blockIt.hasNext()) {
+                               final T2 nextBlockRec = this.blockIt.next();
+                               copy1 = this.createCopy(serializer1, firstV1, 
this.copy1);
+                               matchFunction.join(copy1, nextBlockRec, 
collector);
+                       }
+                       this.blockIt.reset();
+
+                       // spilling is required if the blocked input has data 
beyond the current block.
+                       // in that case, create the spilling iterator
+                       final Iterator<T1> leftSideIter;
+                       final boolean spillingRequired = 
this.blockIt.hasFurtherInput();
+                       if (spillingRequired) {
+                               // more data than would fit into one block. we 
need to wrap the other side in a spilling iterator
+                               // create spilling iterator on first input
+                               spillIt = new 
SpillingResettableIterator<T1>(spillVals, this.serializer1,
+                                               this.memoryManager, 
this.ioManager, this.memoryForSpillingIterator);
+                               leftSideIter = spillIt;
+                               spillIt.open();
+
+                               spillHeadCopy = this.createCopy(serializer1, 
firstV1, this.spillHeadCopy);
+                       } else {
+                               leftSideIter = spillVals;
+                       }
+
+                       // cross the values in the v1 iterator against the 
current block
+
+                       while (leftSideIter.hasNext()) {
+                               final T1 nextSpillVal = leftSideIter.next();
+                               copy1 = this.createCopy(serializer1, 
nextSpillVal, this.copy1);
+
+
+                               // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
+                               T2 copy2 = this.createCopy(serializer2, 
blockHeadCopy, this.copy2);
+                               matchFunction.join(copy1, copy2, collector);
+
+                               // -------- 4) cross the iterator of the 
spilling side with the first block --------
+                               while (this.blockIt.hasNext()) {
+                                       T2 nextBlockRec = this.blockIt.next();
+
+                                       // get instances of key and block value
+                                       copy1 = this.createCopy(serializer1, 
nextSpillVal, this.copy1);
+                                       matchFunction.join(copy1, nextBlockRec, 
collector);
+                               }
+                               // reset block iterator
+                               this.blockIt.reset();
+                       }
+
+                       // if everything from the block-side fit into a single 
block, we are done.
+                       // note that in this special case, we did not create a 
spilling iterator at all
+                       if (!spillingRequired) {
+                               return;
+                       }
+
+                       // here we are, because we have more blocks on the 
block side
+                       // loop as long as there are blocks from the blocked 
input
+                       while (this.blockIt.nextBlock()) {
+                               // rewind the spilling iterator
+                               spillIt.reset();
+
+                               // ------------- 5) cross the head of the 
spilling side with the next block ------------
+                               while (this.blockIt.hasNext()) {
+                                       copy1 = this.createCopy(serializer1, 
spillHeadCopy, this.copy1);
+                                       final T2 nextBlockVal = blockIt.next();
+                                       matchFunction.join(copy1, nextBlockVal, 
collector);
+                               }
+                               this.blockIt.reset();
+
+                               // -------- 6) cross the spilling iterator with 
the next block. ------------------
+                               while (spillIt.hasNext()) {
+                                       // get value from resettable iterator
+                                       final T1 nextSpillVal = spillIt.next();
+                                       // cross value with block values
+                                       while (this.blockIt.hasNext()) {
+                                               // get instances of key and 
block value
+                                               final T2 nextBlockVal = 
this.blockIt.next();
+                                               copy1 = 
this.createCopy(serializer1, nextSpillVal, this.copy1);
+                                               matchFunction.join(copy1, 
nextBlockVal, collector);
+                                       }
+
+                                       // reset block iterator
+                                       this.blockIt.reset();
+                               }
+                               // reset v1 iterator
+                               spillIt.reset();
+                       }
+               } finally {
+                       if (spillIt != null) {
+                               
this.memoryForSpillingIterator.addAll(spillIt.close());
+                       }
+               }
+       }
+
+
+       protected abstract <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator);
+
+       /**
+        * Copies an instance of the given type, potentially reusing the object 
passed as the reuse parameter, which may be null.
+        */
+       protected abstract <T> T createCopy(TypeSerializer<T> serializer, T 
value, T reuse);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
new file mode 100644
index 0000000..791494d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * matching through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeMatchIterator<T1, T2, O> extends 
AbstractMergeIterator<T1, T2, O> {
+
+       public AbstractMergeMatchIterator(MutableObjectIterator<T1> input1, 
MutableObjectIterator<T2> input2,
+                                                                       
TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+                                                                       
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+                                                                       
TypePairComparator<T1, T2> pairComparator,
+                                                                       
MemoryManager memoryManager,
+                                                                       
IOManager ioManager,
+                                                                       int 
numMemoryPages,
+                                                                       
AbstractInvokable parentTask)
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+       }
+
+       /**
+        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come
+        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
+        * <p>
+        * This method first zig-zags between the two sorted inputs in order to 
find a common
+        * key, and then calls the match stub with the cross product of the 
values.
+        *
+        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+        */
+       @Override
+       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
+                       throws Exception {
+               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
+                       while (this.iterator1.nextKey()) ;
+                       while (this.iterator2.nextKey()) ;
+
+                       return false;
+               }
+
+               final TypePairComparator<T1, T2> comparator = 
this.pairComparator;
+               comparator.setReference(this.iterator1.getCurrent());
+               T2 current2 = this.iterator2.getCurrent();
+
+               // zig zag
+               while (true) {
+                       // determine the relation between the (possibly 
composite) keys
+                       final int comp = 
comparator.compareToReference(current2);
+
+                       if (comp == 0) {
+                               break;
+                       }
+
+                       if (comp < 0) {
+                               if (!this.iterator2.nextKey()) {
+                                       return false;
+                               }
+                               current2 = this.iterator2.getCurrent();
+                       } else {
+                               if (!this.iterator1.nextKey()) {
+                                       return false;
+                               }
+                               
comparator.setReference(this.iterator1.getCurrent());
+                       }
+               }
+
+               // here, we have a common key! call the match function with the 
cross product of the
+               // values
+               final Iterator<T1> values1 = this.iterator1.getValues();
+               final Iterator<T2> values2 = this.iterator2.getValues();
+
+               crossMatchingGroup(values1, values2, matchFunction, collector);
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
index c89b5c5..9705778 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,60 +18,19 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import 
org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
-import 
org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
 
-/**
- * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public class NonReusingMergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O> {
-
-       /**
-        * The log used by this iterator to log messages.
-        */
-       private static final Logger LOG = 
LoggerFactory.getLogger(NonReusingMergeMatchIterator.class);
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private TypePairComparator<T1, T2> comp;
-
-       private NonReusingKeyGroupedIterator<T1> iterator1;
-
-       private NonReusingKeyGroupedIterator<T2> iterator2;
-
-       private final TypeSerializer<T1> serializer1;
-
-       private final TypeSerializer<T2> serializer2;
-
-       private final NonReusingBlockResettableIterator<T2> blockIt;    // for 
N:M cross products with same key
-
-       private final List<MemorySegment> memoryForSpillingIterator;
-
-       private final MemoryManager memoryManager;
-
-       private final IOManager ioManager;
-
-       // 
--------------------------------------------------------------------------------------------
+public class NonReusingMergeMatchIterator<T1, T2, O> extends 
AbstractMergeMatchIterator<T1, T2, O> {
 
        public NonReusingMergeMatchIterator(
                        MutableObjectIterator<T1> input1,
@@ -83,341 +42,18 @@ public class NonReusingMergeMatchIterator<T1, T2, O> 
implements JoinTaskIterator
                        IOManager ioManager,
                        int numMemoryPages,
                        AbstractInvokable parentTask)
-       throws MemoryAllocationException
-       {
-               if (numMemoryPages < 2) {
-                       throw new IllegalArgumentException("Merger needs at 
least 2 memory pages.");
-               }
-
-               this.comp = pairComparator;
-               this.serializer1 = serializer1;
-               this.serializer2 = serializer2;
-
-               this.memoryManager = memoryManager;
-               this.ioManager = ioManager;
-
-               this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, 
comparator1.duplicate());
-               this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, 
comparator2.duplicate());
-
-               final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
-               this.blockIt = new 
NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
-                       (numMemoryPages - numPagesForSpiller), parentTask);
-               this.memoryForSpillingIterator = 
memoryManager.allocatePages(parentTask, numPagesForSpiller);
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
        }
 
-
-       @Override
-       public void open() throws IOException {}
-
-
-       @Override
-       public void close() {
-               if (this.blockIt != null) {
-                       try {
-                               this.blockIt.close();
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Error closing block memory iterator: 
" + t.getMessage(), t);
-                       }
-               }
-
-               this.memoryManager.release(this.memoryForSpillingIterator);
-       }
-
-
        @Override
-       public void abort() {
-               close();
+       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
+               return new NonReusingKeyGroupedIterator<T>(input, comparator);
        }
 
-       /**
-        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come
-        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
-        * <p>
-        * This method first zig-zags between the two sorted inputs in order to 
find a common
-        * key, and then calls the match stub with the cross product of the 
values.
-        *
-        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
-        *
-        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
-        */
        @Override
-       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
-       throws Exception
-       {
-               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
-                       while (this.iterator1.nextKey());
-                       while (this.iterator2.nextKey());
-                       
-                       return false;
-               }
-
-               final TypePairComparator<T1, T2> comparator = this.comp;
-               comparator.setReference(this.iterator1.getCurrent());
-               T2 current2 = this.iterator2.getCurrent();
-                               
-               // zig zag
-               while (true) {
-                       // determine the relation between the (possibly 
composite) keys
-                       final int comp = 
comparator.compareToReference(current2);
-                       
-                       if (comp == 0) {
-                               break;
-                       }
-                       
-                       if (comp < 0) {
-                               if (!this.iterator2.nextKey()) {
-                                       return false;
-                               }
-                               current2 = this.iterator2.getCurrent();
-                       }
-                       else {
-                               if (!this.iterator1.nextKey()) {
-                                       return false;
-                               }
-                               
comparator.setReference(this.iterator1.getCurrent());
-                       }
-               }
-               
-               // here, we have a common key! call the match function with the 
cross product of the
-               // values
-               final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 = 
this.iterator1.getValues();
-               final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 = 
this.iterator2.getValues();
-               
-               final T1 firstV1 = values1.next();
-               final T2 firstV2 = values2.next();      
-                       
-               final boolean v1HasNext = values1.hasNext();
-               final boolean v2HasNext = values2.hasNext();
-
-               // check if one side is already empty
-               // this check could be omitted if we put this in MatchTask.
-               // then we can derive the local strategy (with build side).
-               
-               if (v1HasNext) {
-                       if (v2HasNext) {
-                               // both sides contain more than one value
-                               // TODO: Decide which side to spill and which 
to block!
-                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
-                       } else {
-                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
-                       }
-               } else {
-                       if (v2HasNext) {
-                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
-                       } else {
-                               // both sides contain only one value
-                               matchFunction.join(firstV1, firstV2, collector);
-                       }
-               }
-               return true;
-       }
-
-       /**
-        * Crosses a single value from the first input with N values, all 
sharing a common key.
-        * Effectively realizes a <i>1:N</i> match (join).
-        * 
-        * @param val1 The value form the <i>1</i> side.
-        * @param firstValN The first of the values from the <i>N</i> side.
-        * @param valsN Iterator over remaining <i>N</i> side values.
-        *          
-        * @throws Exception Forwards all exceptions thrown by the stub.
-        */
-       private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-                       final Iterator<T2> valsN, final FlatJoinFunction<T1, 
T2, O> matchFunction, final Collector<O> collector)
-       throws Exception
-       {
-               T1 copy1 = this.serializer1.copy(val1);
-               matchFunction.join(copy1, firstValN, collector);
-               
-               // set copy and match first element
-               boolean more = true;
-               do {
-                       final T2 nRec = valsN.next();
-                       
-                       if (valsN.hasNext()) {
-                               copy1 = this.serializer1.copy(val1);
-                               matchFunction.join(copy1, nRec, collector);
-                       } else {
-                               matchFunction.join(val1, nRec, collector);
-                               more = false;
-                       }
-               }
-               while (more);
-       }
-       
-       /**
-        * Crosses a single value from the second side with N values, all 
sharing a common key.
-        * Effectively realizes a <i>N:1</i> match (join).
-        * 
-        * @param val1 The value form the <i>1</i> side.
-        * @param firstValN The first of the values from the <i>N</i> side.
-        * @param valsN Iterator over remaining <i>N</i> side values.
-        *          
-        * @throws Exception Forwards all exceptions thrown by the stub.
-        */
-       private void crossSecond1withNValues(T2 val1, T1 firstValN,
-                       Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> 
matchFunction, Collector<O> collector)
-       throws Exception
-       {
-               T2 copy2 = this.serializer2.copy(val1);
-               matchFunction.join(firstValN, copy2, collector);
-               
-               // set copy and match first element
-               boolean more = true;
-               do {
-                       final T1 nRec = valsN.next();
-                       
-                       if (valsN.hasNext()) {
-                               copy2 = this.serializer2.copy(val1);
-                               matchFunction.join(nRec, copy2, collector);
-                       } else {
-                               matchFunction.join(nRec, val1, collector);
-                               more = false;
-                       }
-               }
-               while (more);
+       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
+               return serializer.copy(value);
        }
-       
-       /**
-        * @param firstV1
-        * @param spillVals
-        * @param firstV2
-        * @param blockVals
-        */
-       private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
-                       final T2 firstV2, final Iterator<T2> blockVals,
-                       final FlatJoinFunction<T1, T2, O> matchFunction, final 
Collector<O> collector)
-       throws Exception
-       {
-               // ==================================================
-               // We have one first (head) element from both inputs (firstV1 
and firstV2)
-               // We have an iterator for both inputs.
-               // we make the V1 side the spilling side and the V2 side the 
blocking side.
-               // In order to get the full cross product without unnecessary 
spilling, we do the
-               // following:
-               // 1) cross the heads
-               // 2) cross the head of the spilling side against the first 
block of the blocking side
-               // 3) cross the iterator of the spilling side with the head of 
the block side
-               // 4) cross the iterator of the spilling side with the first 
block
-               // ---------------------------------------------------
-               // If the blocking side has more than one block, we really need 
to make the spilling side fully
-               // resettable. For each further block on the block side, we do:
-               // 5) cross the head of the spilling side with the next block
-               // 6) cross the spilling iterator with the next block.
-               
-               // match the first values first
-               T1 copy1 = this.serializer1.copy(firstV1);
-               T2 blockHeadCopy = this.serializer2.copy(firstV2);
-               T1 spillHeadCopy = null;
 
-
-               // --------------- 1) Cross the heads -------------------
-               matchFunction.join(copy1, firstV2, collector);
-               
-               // for the remaining values, we do a block-nested-loops join
-               SpillingResettableIterator<T1> spillIt = null;
-               
-               try {
-                       // create block iterator on the second input
-                       this.blockIt.reopen(blockVals);
-                       
-                       // ------------- 2) cross the head of the spilling side 
with the first block ------------------
-                       while (this.blockIt.hasNext()) {
-                               final T2 nextBlockRec = this.blockIt.next();
-                               copy1 = this.serializer1.copy(firstV1);
-                               matchFunction.join(copy1, nextBlockRec, 
collector);
-                       }
-                       this.blockIt.reset();
-                       
-                       // spilling is required if the blocked input has data 
beyond the current block.
-                       // in that case, create the spilling iterator
-                       final Iterator<T1> leftSideIter;
-                       final boolean spillingRequired = 
this.blockIt.hasFurtherInput();
-                       if (spillingRequired)
-                       {
-                               // more data than would fit into one block. we 
need to wrap the other side in a spilling iterator
-                               // create spilling iterator on first input
-                               spillIt = new 
SpillingResettableIterator<T1>(spillVals, this.serializer1,
-                                               this.memoryManager, 
this.ioManager, this.memoryForSpillingIterator);
-                               leftSideIter = spillIt;
-                               spillIt.open();
-                               
-                               spillHeadCopy = this.serializer1.copy(firstV1);
-                       }
-                       else {
-                               leftSideIter = spillVals;
-                       }
-                       
-                       // cross the values in the v1 iterator against the 
current block
-                       
-                       while (leftSideIter.hasNext()) {
-                               final T1 nextSpillVal = leftSideIter.next();
-                               copy1 = this.serializer1.copy(nextSpillVal);
-                               
-                               
-                               // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
-                               T2 copy2 = this.serializer2.copy(blockHeadCopy);
-                               matchFunction.join(copy1, copy2, collector);
-                               
-                               // -------- 4) cross the iterator of the 
spilling side with the first block --------
-                               while (this.blockIt.hasNext()) {
-                                       T2 nextBlockRec = this.blockIt.next();
-                                       
-                                       // get instances of key and block value
-                                       copy1 = 
this.serializer1.copy(nextSpillVal);
-                                       matchFunction.join(copy1, nextBlockRec, 
collector);
-                               }
-                               // reset block iterator
-                               this.blockIt.reset();
-                       }
-                       
-                       // if everything from the block-side fit into a single 
block, we are done.
-                       // note that in this special case, we did not create a 
spilling iterator at all
-                       if (!spillingRequired) {
-                               return;
-                       }
-                       
-                       // here we are, because we have more blocks on the 
block side
-                       // loop as long as there are blocks from the blocked 
input
-                       while (this.blockIt.nextBlock())
-                       {
-                               // rewind the spilling iterator
-                               spillIt.reset();
-                               
-                               // ------------- 5) cross the head of the 
spilling side with the next block ------------
-                               while (this.blockIt.hasNext()) {
-                                       copy1 = 
this.serializer1.copy(spillHeadCopy);
-                                       final T2 nextBlockVal = blockIt.next();
-                                       matchFunction.join(copy1, nextBlockVal, 
collector);
-                               }
-                               this.blockIt.reset();
-                               
-                               // -------- 6) cross the spilling iterator with 
the next block. ------------------
-                               while (spillIt.hasNext())
-                               {
-                                       // get value from resettable iterator
-                                       final T1 nextSpillVal = spillIt.next();
-                                       // cross value with block values
-                                       while (this.blockIt.hasNext()) {
-                                               // get instances of key and 
block value
-                                               final T2 nextBlockVal = 
this.blockIt.next();
-                                               copy1 = 
this.serializer1.copy(nextSpillVal);
-                                               matchFunction.join(copy1, 
nextBlockVal, collector);
-                                       }
-                                       
-                                       // reset block iterator
-                                       this.blockIt.reset();
-                               }
-                               // reset v1 iterator
-                               spillIt.reset();
-                       }
-               }
-               finally {
-                       if (spillIt != null) {
-                               
this.memoryForSpillingIterator.addAll(spillIt.close());
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
index 66beee1..c9cf5a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
@@ -18,70 +18,20 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import 
org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
-import 
org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 
-/**
- * An implementation of the {@link JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public class ReusingMergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O> {
-       
-       /**
-        * The log used by this iterator to log messages.
-        */
-       private static final Logger LOG = 
LoggerFactory.getLogger(ReusingMergeMatchIterator.class);
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private TypePairComparator<T1, T2> comp;
-       
-       private ReusingKeyGroupedIterator<T1> iterator1;
-
-       private ReusingKeyGroupedIterator<T2> iterator2;
-       
-       private final TypeSerializer<T1> serializer1;
-       
-       private final TypeSerializer<T2> serializer2;
-       
-       private T1 copy1;
-       
-       private T1 spillHeadCopy;
-       
-       private T2 copy2;
-       
-       private T2 blockHeadCopy;
-       
-       private final NonReusingBlockResettableIterator<T2> blockIt;            
                // for N:M cross products with same key
-       
-       private final List<MemorySegment> memoryForSpillingIterator;
-       
-       private final MemoryManager memoryManager;
+public class ReusingMergeMatchIterator<T1, T2, O> extends 
AbstractMergeMatchIterator<T1, T2, O> {
 
-       private final IOManager ioManager;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
        public ReusingMergeMatchIterator(
                        MutableObjectIterator<T1> input1,
                        MutableObjectIterator<T2> input2,
@@ -92,344 +42,23 @@ public class ReusingMergeMatchIterator<T1, T2, O> 
implements JoinTaskIterator<T1
                        IOManager ioManager,
                        int numMemoryPages,
                        AbstractInvokable parentTask)
-       throws MemoryAllocationException
-       {
-               if (numMemoryPages < 2) {
-                       throw new IllegalArgumentException("Merger needs at 
least 2 memory pages.");
-               }
-               
-               this.comp = pairComparator;
-               this.serializer1 = serializer1;
-               this.serializer2 = serializer2;
-               
+                       throws MemoryAllocationException {
+               super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
                this.copy1 = serializer1.createInstance();
                this.spillHeadCopy = serializer1.createInstance();
                this.copy2 = serializer2.createInstance();
                this.blockHeadCopy = serializer2.createInstance();
-               
-               this.memoryManager = memoryManager;
-               this.ioManager = ioManager;
-               
-               this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, 
this.serializer1, comparator1.duplicate());
-               this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, 
this.serializer2, comparator2.duplicate());
-               
-               final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
-               this.blockIt = new 
NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
-                       (numMemoryPages - numPagesForSpiller), parentTask);
-               this.memoryForSpillingIterator = 
memoryManager.allocatePages(parentTask, numPagesForSpiller);
-       }
-
-
-       @Override
-       public void open() throws IOException {}
-
-
-       @Override
-       public void close() {
-               if (this.blockIt != null) {
-                       try {
-                               this.blockIt.close();
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Error closing block memory iterator: 
" + t.getMessage(), t);
-                       }
-               }
-               
-               this.memoryManager.release(this.memoryForSpillingIterator);
        }
-       
 
        @Override
-       public void abort() {
-               close();
+       protected <T> KeyGroupedIterator<T> 
createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> 
serializer, TypeComparator<T> comparator) {
+               return new ReusingKeyGroupedIterator<T>(input, serializer, 
comparator);
        }
 
-       /**
-        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come 
-        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
-        * <p>
-        * This method first zig-zags between the two sorted inputs in order to 
find a common
-        * key, and then calls the match stub with the cross product of the 
values.
-        * 
-        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
-        * 
-        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction,
 Collector)
-        */
        @Override
-       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
-       throws Exception
-       {
-               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
-                       while (this.iterator1.nextKey());
-                       while (this.iterator2.nextKey());
-                       
-                       return false;
-               }
-
-               final TypePairComparator<T1, T2> comparator = this.comp;
-               comparator.setReference(this.iterator1.getCurrent());
-               T2 current2 = this.iterator2.getCurrent();
-                               
-               // zig zag
-               while (true) {
-                       // determine the relation between the (possibly 
composite) keys
-                       final int comp = 
comparator.compareToReference(current2);
-                       
-                       if (comp == 0) {
-                               break;
-                       }
-                       
-                       if (comp < 0) {
-                               if (!this.iterator2.nextKey()) {
-                                       return false;
-                               }
-                               current2 = this.iterator2.getCurrent();
-                       }
-                       else {
-                               if (!this.iterator1.nextKey()) {
-                                       return false;
-                               }
-                               
comparator.setReference(this.iterator1.getCurrent());
-                       }
-               }
-               
-               // here, we have a common key! call the match function with the 
cross product of the
-               // values
-               final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = 
this.iterator1.getValues();
-               final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = 
this.iterator2.getValues();
-               
-               final T1 firstV1 = values1.next();
-               final T2 firstV2 = values2.next();      
-                       
-               final boolean v1HasNext = values1.hasNext();
-               final boolean v2HasNext = values2.hasNext();
-
-               // check if one side is already empty
-               // this check could be omitted if we put this in MatchTask.
-               // then we can derive the local strategy (with build side).
-               
-               if (v1HasNext) {
-                       if (v2HasNext) {
-                               // both sides contain more than one value
-                               // TODO: Decide which side to spill and which 
to block!
-                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
-                       } else {
-                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
-                       }
-               } else {
-                       if (v2HasNext) {
-                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
-                       } else {
-                               // both sides contain only one value
-                               matchFunction.join(firstV1, firstV2, collector);
-                       }
-               }
-               return true;
+       protected <T> T createCopy(TypeSerializer<T> serializer, T value, T 
reuse) {
+               return serializer.copy(value, reuse);
        }
 
-       /**
-        * Crosses a single value from the first input with N values, all 
sharing a common key.
-        * Effectively realizes a <i>1:N</i> match (join).
-        * 
-        * @param val1 The value form the <i>1</i> side.
-        * @param firstValN The first of the values from the <i>N</i> side.
-        * @param valsN Iterator over remaining <i>N</i> side values.
-        *          
-        * @throws Exception Forwards all exceptions thrown by the stub.
-        */
-       private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-                       final Iterator<T2> valsN, final FlatJoinFunction<T1, 
T2, O> matchFunction, final Collector<O> collector)
-       throws Exception
-       {
-               this.copy1 = this.serializer1.copy(val1, this.copy1);
-               matchFunction.join(this.copy1, firstValN, collector);
-               
-               // set copy and match first element
-               boolean more = true;
-               do {
-                       final T2 nRec = valsN.next();
-                       
-                       if (valsN.hasNext()) {
-                               this.copy1 = this.serializer1.copy(val1, 
this.copy1);
-                               matchFunction.join(this.copy1, nRec, collector);
-                       } else {
-                               matchFunction.join(val1, nRec, collector);
-                               more = false;
-                       }
-               }
-               while (more);
-       }
-       
-       /**
-        * Crosses a single value from the second side with N values, all 
sharing a common key.
-        * Effectively realizes a <i>N:1</i> match (join).
-        * 
-        * @param val1 The value form the <i>1</i> side.
-        * @param firstValN The first of the values from the <i>N</i> side.
-        * @param valsN Iterator over remaining <i>N</i> side values.
-        *          
-        * @throws Exception Forwards all exceptions thrown by the stub.
-        */
-       private void crossSecond1withNValues(T2 val1, T1 firstValN,
-                       Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> 
matchFunction, Collector<O> collector)
-       throws Exception
-       {
-               this.copy2 = this.serializer2.copy(val1, this.copy2);
-               matchFunction.join(firstValN, this.copy2, collector);
-               
-               // set copy and match first element
-               boolean more = true;
-               do {
-                       final T1 nRec = valsN.next();
-                       
-                       if (valsN.hasNext()) {
-                               this.copy2 = this.serializer2.copy(val1, 
this.copy2);
-                               matchFunction.join(nRec,this.copy2,collector);
-                       } else {
-                               matchFunction.join(nRec, val1, collector);
-                               more = false;
-                       }
-               }
-               while (more);
-       }
-       
-       /**
-        * @param firstV1
-        * @param spillVals
-        * @param firstV2
-        * @param blockVals
-        */
-       private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
-                       final T2 firstV2, final Iterator<T2> blockVals,
-                       final FlatJoinFunction<T1, T2, O> matchFunction, final 
Collector<O> collector)
-       throws Exception
-       {
-               // ==================================================
-               // We have one first (head) element from both inputs (firstV1 
and firstV2)
-               // We have an iterator for both inputs.
-               // we make the V1 side the spilling side and the V2 side the 
blocking side.
-               // In order to get the full cross product without unnecessary 
spilling, we do the
-               // following:
-               // 1) cross the heads
-               // 2) cross the head of the spilling side against the first 
block of the blocking side
-               // 3) cross the iterator of the spilling side with the head of 
the block side
-               // 4) cross the iterator of the spilling side with the first 
block
-               // ---------------------------------------------------
-               // If the blocking side has more than one block, we really need 
to make the spilling side fully
-               // resettable. For each further block on the block side, we do:
-               // 5) cross the head of the spilling side with the next block
-               // 6) cross the spilling iterator with the next block.
-               
-               // match the first values first
-               this.copy1 = this.serializer1.copy(firstV1, this.copy1);
-               this.blockHeadCopy = this.serializer2.copy(firstV2, 
this.blockHeadCopy);
-               
-               // --------------- 1) Cross the heads -------------------
-               matchFunction.join(this.copy1, firstV2, collector);
-               
-               // for the remaining values, we do a block-nested-loops join
-               SpillingResettableIterator<T1> spillIt = null;
-               
-               try {
-                       // create block iterator on the second input
-                       this.blockIt.reopen(blockVals);
-                       
-                       // ------------- 2) cross the head of the spilling side 
with the first block ------------------
-                       while (this.blockIt.hasNext()) {
-                               final T2 nextBlockRec = this.blockIt.next();
-                               this.copy1 = this.serializer1.copy(firstV1, 
this.copy1);
-                               matchFunction.join(this.copy1, nextBlockRec, 
collector);
-                       }
-                       this.blockIt.reset();
-                       
-                       // spilling is required if the blocked input has data 
beyond the current block.
-                       // in that case, create the spilling iterator
-                       final Iterator<T1> leftSideIter;
-                       final boolean spillingRequired = 
this.blockIt.hasFurtherInput();
-                       if (spillingRequired)
-                       {
-                               // more data than would fit into one block. we 
need to wrap the other side in a spilling iterator
-                               // create spilling iterator on first input
-                               spillIt = new 
SpillingResettableIterator<T1>(spillVals, this.serializer1,
-                                               this.memoryManager, 
this.ioManager, this.memoryForSpillingIterator);
-                               leftSideIter = spillIt;
-                               spillIt.open();
-                               
-                               this.spillHeadCopy = 
this.serializer1.copy(firstV1, this.spillHeadCopy);
-                       }
-                       else {
-                               leftSideIter = spillVals;
-                       }
-                       
-                       // cross the values in the v1 iterator against the 
current block
-                       
-                       while (leftSideIter.hasNext()) {
-                               final T1 nextSpillVal = leftSideIter.next();
-                               this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                               
-                               
-                               // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
-                               this.copy2 = 
this.serializer2.copy(this.blockHeadCopy, this.copy2);
-                               matchFunction.join(this.copy1, this.copy2, 
collector);
-                               
-                               // -------- 4) cross the iterator of the 
spilling side with the first block --------
-                               while (this.blockIt.hasNext()) {
-                                       T2 nextBlockRec = this.blockIt.next();
-                                       
-                                       // get instances of key and block value
-                                       this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                                       matchFunction.join(this.copy1, 
nextBlockRec, collector);
-                               }
-                               // reset block iterator
-                               this.blockIt.reset();
-                       }
-                       
-                       // if everything from the block-side fit into a single 
block, we are done.
-                       // note that in this special case, we did not create a 
spilling iterator at all
-                       if (!spillingRequired) {
-                               return;
-                       }
-                       
-                       // here we are, because we have more blocks on the 
block side
-                       // loop as long as there are blocks from the blocked 
input
-                       while (this.blockIt.nextBlock())
-                       {
-                               // rewind the spilling iterator
-                               spillIt.reset();
-                               
-                               // ------------- 5) cross the head of the 
spilling side with the next block ------------
-                               while (this.blockIt.hasNext()) {
-                                       this.copy1 = 
this.serializer1.copy(this.spillHeadCopy, this.copy1);
-                                       final T2 nextBlockVal = blockIt.next();
-                                       matchFunction.join(this.copy1, 
nextBlockVal, collector);
-                               }
-                               this.blockIt.reset();
-                               
-                               // -------- 6) cross the spilling iterator with 
the next block. ------------------
-                               while (spillIt.hasNext())
-                               {
-                                       // get value from resettable iterator
-                                       final T1 nextSpillVal = spillIt.next();
-                                       // cross value with block values
-                                       while (this.blockIt.hasNext()) {
-                                               // get instances of key and 
block value
-                                               final T2 nextBlockVal = 
this.blockIt.next();
-                                               this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                                               matchFunction.join(this.copy1, 
nextBlockVal, collector);
-                                       }
-                                       
-                                       // reset block iterator
-                                       this.blockIt.reset();
-                               }
-                               // reset v1 iterator
-                               spillIt.reset();
-                       }
-               }
-               finally {
-                       if (spillIt != null) {
-                               
this.memoryForSpillingIterator.addAll(spillIt.close());
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
new file mode 100644
index 0000000..64e8298
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface KeyGroupedIterator<E> {
+
+       boolean nextKey() throws IOException;
+
+       E getCurrent();
+
+       Iterator<E> getValues();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
index 3f28cfc..6f4448c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.TraversableOnceException;
 /**
  * The key grouped iterator returns a key and all values that share the same 
key.
  */
-public final class NonReusingKeyGroupedIterator<E> {
+public final class NonReusingKeyGroupedIterator<E> implements 
KeyGroupedIterator<E> {
        
        private final MutableObjectIterator<E> iterator;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
index 4dc9dd3..1477f10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.TraversableOnceException;
  * The KeyValueIterator returns a key and all values that belong to the key 
(share the same key).
  * 
  */
-public final class ReusingKeyGroupedIterator<E> {
+public final class ReusingKeyGroupedIterator<E> implements 
KeyGroupedIterator<E> {
        
        private final MutableObjectIterator<E> iterator;
 
@@ -78,6 +78,7 @@ public final class ReusingKeyGroupedIterator<E> {
         * 
         * @return true if the input iterator has an other group of key-value 
pairs that share the same key.
         */
+       @Override
        public boolean nextKey() throws IOException
        {
                // first element (or empty)
@@ -139,6 +140,7 @@ public final class ReusingKeyGroupedIterator<E> {
                return this.comparator;
        }
        
+       @Override
        public E getCurrent() {
                return this.current;
        }
@@ -150,6 +152,7 @@ public final class ReusingKeyGroupedIterator<E> {
         * 
         * @return Iterator over all values that belong to the current key.
         */
+       @Override
        public ValuesIterator getValues() {
                return this.valuesIterator;
        }

Reply via email to