[FLINK-1285] Make Merge-Join aware of object-reuse setting

This closes #259


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d529749c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d529749c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d529749c

Branch: refs/heads/master
Commit: d529749c8f45af693efffe1f69860dae0bfe70bf
Parents: b7b32a0
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Dec 11 14:58:23 2014 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jan 7 19:16:10 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchDriver.java    |   7 +-
 .../resettable/BlockResettableIterator.java     | 210 ---------
 .../NonReusingBlockResettableIterator.java      | 204 +++++++++
 .../ReusingBlockResettableIterator.java         | 100 +++++
 .../operators/sort/MergeMatchIterator.java      | 429 ------------------
 .../sort/NonReusingMergeMatchIterator.java      | 424 ++++++++++++++++++
 .../sort/ReusingMergeMatchIterator.java         | 435 +++++++++++++++++++
 .../resettable/BlockResettableIteratorTest.java | 202 ---------
 .../NonReusingBlockResettableIteratorTest.java  | 201 +++++++++
 .../ReusingBlockResettableIteratorTest.java     | 201 +++++++++
 .../NonReusingSortMergeMatchIteratorITCase.java | 371 ++++++++++++++++
 .../ReusingSortMergeMatchIteratorITCase.java    | 371 ++++++++++++++++
 .../sort/SortMergeMatchIteratorITCase.java      | 373 ----------------
 .../operators/util/HashVsSortMiniBenchmark.java |   6 +-
 14 files changed, 2314 insertions(+), 1220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index 2d051ad..f8e4a29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -32,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import 
org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import 
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -125,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT
                if (this.objectReuseEnabled) {
                        switch (ls) {
                                case MERGE:
-                                       this.matchIterator = new 
MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
+                                       this.matchIterator = new 
ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
 
                                        break;
                                case HYBRIDHASH_BUILD_FIRST:
@@ -140,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT
                } else {
                        switch (ls) {
                                case MERGE:
-                                       this.matchIterator = new 
MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
+                                       this.matchIterator = new 
NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, 
serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, 
comparator2), memoryManager, ioManager, numPages, 
this.taskContext.getOwningNepheleTask());
 
                                        break;
                                case HYBRIDHASH_BUILD_FIRST:

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
deleted file mode 100644
index 0019c8c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.resettable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.util.ResettableIterator;
-
-/**
- * Implementation of an iterator that fetches a block of data into main memory 
and offers resettable
- * access to the data in that block.
- * 
- */
-public class BlockResettableIterator<T> extends 
AbstractBlockResettableIterator<T> implements ResettableIterator<T> {
-       
-       public static final Logger LOG = 
LoggerFactory.getLogger(BlockResettableIterator.class);
-       
-       // 
------------------------------------------------------------------------
-       
-       protected Iterator<T> input;
-       
-       private T nextElement;
-
-       private final T reuseElement;
-       
-       private T leftOverElement;
-       
-       private boolean readPhase;
-       
-       private boolean noMoreBlocks;
-       
-       // 
------------------------------------------------------------------------
-       
-       public BlockResettableIterator(MemoryManager memoryManager, Iterator<T> 
input,
-                       TypeSerializer<T> serializer, int numPages, 
AbstractInvokable ownerTask)
-       throws MemoryAllocationException
-       {
-               this(memoryManager, serializer, numPages, ownerTask);
-               this.input = input;
-       }
-       
-       public BlockResettableIterator(MemoryManager memoryManager,
-                       TypeSerializer<T> serializer, int numPages, 
AbstractInvokable ownerTask)
-       throws MemoryAllocationException
-       {
-               super(serializer, memoryManager, numPages, ownerTask);
-               
-               this.reuseElement = serializer.createInstance();
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public void reopen(Iterator<T> input) throws IOException {
-               this.input = input;
-               
-               this.noMoreBlocks = false;
-               this.closed = false;
-               
-               nextBlock();
-       }
-       
-       
-
-       @Override
-       public boolean hasNext() {
-               try {
-                       if (this.nextElement == null) {
-                               if (this.readPhase) {
-                                       // read phase, get next element from 
buffer
-                                       T tmp = 
getNextRecord(this.reuseElement);
-                                       if (tmp != null) {
-                                               this.nextElement = tmp;
-                                               return true;
-                                       } else {
-                                               return false;
-                                       }
-                               } else {
-                                       if (this.input.hasNext()) {
-                                               final T next = 
this.input.next();
-                                               if (writeNextRecord(next)) {
-                                                       this.nextElement = next;
-                                                       return true;
-                                               } else {
-                                                       this.leftOverElement = 
next;
-                                                       return false;
-                                               }
-                                       } else {
-                                               this.noMoreBlocks = true;
-                                               return false;
-                                       }
-                               }
-                       } else {
-                               return true;
-                       }
-               } catch (IOException ioex) {
-                       throw new RuntimeException("Error (de)serializing 
record in block resettable iterator.", ioex);
-               }
-       }
-       
-
-       @Override
-       public T next() {
-               if (this.nextElement == null) {
-                       if (!hasNext()) {
-                               throw new NoSuchElementException();
-                       }
-               }
-               
-               T out = this.nextElement;
-               this.nextElement = null;
-               return out;
-       }
-       
-
-       @Override
-       public void remove() {
-               throw new UnsupportedOperationException();
-       }
-       
-
-       public void reset() {
-               // a reset always goes to the read phase
-               this.readPhase = true;
-               super.reset();
-       }
-       
-
-       @Override
-       public boolean nextBlock() throws IOException {
-               // check the state
-               if (this.closed) {
-                       throw new IllegalStateException("Iterator has been 
closed.");
-               }
-               
-               // check whether more blocks are available
-               if (this.noMoreBlocks) {
-                       return false;
-               }
-               
-               // reset the views in the superclass
-               super.nextBlock();
-               
-               T next = this.leftOverElement;
-               this.leftOverElement = null;
-               if (next == null) {
-                       if (this.input.hasNext()) {
-                               next = this.input.next();
-                       }
-                       else {
-                               this.noMoreBlocks = true;
-                               return false;
-                       }
-               }
-               
-               // write the leftover record
-               if (!writeNextRecord(next)) {
-                       throw new IOException("BlockResettableIterator could 
not serialize record into fresh memory block: " +
-                                       "Record is too large.");
-               }
-               
-               this.nextElement = next;
-               this.readPhase = false;
-               
-               return true;
-       }
-       
-       /**
-        * Checks, whether the input that is blocked by this iterator, has 
further elements
-        * available. This method may be used to forecast (for example at the 
point where a
-        * block is full) whether there will be more data (possibly in another 
block).
-        * 
-        * @return True, if there will be more data, false otherwise.
-        */
-       public boolean hasFurtherInput() {
-               return !this.noMoreBlocks; 
-       }
-       
-
-       public void close() {
-               // suggest that we are in the read phase. because nothing is in 
the current block,
-               // read requests will fail
-               this.readPhase = true;
-               super.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
new file mode 100644
index 0000000..9d581ce
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.resettable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.ResettableIterator;
+
+/**
+ * Implementation of an iterator that fetches a block of data into main memory 
and offers resettable
+ * access to the data in that block.
+ * 
+ */
+public class NonReusingBlockResettableIterator<T> extends 
AbstractBlockResettableIterator<T> implements ResettableIterator<T> {
+       
+       public static final Logger LOG = 
LoggerFactory.getLogger(NonReusingBlockResettableIterator.class);
+       
+       // 
------------------------------------------------------------------------
+       
+       protected Iterator<T> input;
+       
+       protected T nextElement;
+
+       protected T leftOverElement;
+       
+       protected boolean readPhase;
+       
+       protected boolean noMoreBlocks;
+       
+       // 
------------------------------------------------------------------------
+       
+       public NonReusingBlockResettableIterator(MemoryManager memoryManager, 
Iterator<T> input,
+                       TypeSerializer<T> serializer, int numPages,
+                       AbstractInvokable ownerTask)
+       throws MemoryAllocationException
+       {
+               this(memoryManager, serializer, numPages, ownerTask);
+               this.input = input;
+       }
+       
+       public NonReusingBlockResettableIterator(MemoryManager memoryManager, 
TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
+       throws MemoryAllocationException
+       {
+               super(serializer, memoryManager, numPages, ownerTask);
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       public void reopen(Iterator<T> input) throws IOException {
+               this.input = input;
+               
+               this.noMoreBlocks = false;
+               this.closed = false;
+               
+               nextBlock();
+       }
+
+       @Override
+       public boolean hasNext() {
+               try {
+                       if (this.nextElement == null) {
+                               if (this.readPhase) {
+                                       // read phase, get next element from 
buffer
+                                       T tmp = getNextRecord();
+                                       if (tmp != null) {
+                                               this.nextElement = tmp;
+                                               return true;
+                                       } else {
+                                               return false;
+                                       }
+                               } else {
+                                       if (this.input.hasNext()) {
+                                               final T next = 
this.input.next();
+                                               if (writeNextRecord(next)) {
+                                                       this.nextElement = next;
+                                                       return true;
+                                               } else {
+                                                       this.leftOverElement = 
next;
+                                                       return false;
+                                               }
+                                       } else {
+                                               this.noMoreBlocks = true;
+                                               return false;
+                                       }
+                               }
+                       } else {
+                               return true;
+                       }
+               } catch (IOException ioex) {
+                       throw new RuntimeException("Error (de)serializing 
record in block resettable iterator.", ioex);
+               }
+       }
+       
+
+       @Override
+       public T next() {
+               if (this.nextElement == null) {
+                       if (!hasNext()) {
+                               throw new NoSuchElementException();
+                       }
+               }
+               
+               T out = this.nextElement;
+               this.nextElement = null;
+               return out;
+       }
+       
+
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+       
+
+       public void reset() {
+               // a reset always goes to the read phase
+               this.readPhase = true;
+               super.reset();
+       }
+       
+
+       @Override
+       public boolean nextBlock() throws IOException {
+               // check the state
+               if (this.closed) {
+                       throw new IllegalStateException("Iterator has been 
closed.");
+               }
+               
+               // check whether more blocks are available
+               if (this.noMoreBlocks) {
+                       return false;
+               }
+               
+               // reset the views in the superclass
+               super.nextBlock();
+               
+               T next = this.leftOverElement;
+               this.leftOverElement = null;
+               if (next == null) {
+                       if (this.input.hasNext()) {
+                               next = this.input.next();
+                       }
+                       else {
+                               this.noMoreBlocks = true;
+                               return false;
+                       }
+               }
+               
+               // write the leftover record
+               if (!writeNextRecord(next)) {
+                       throw new IOException("BlockResettableIterator could 
not serialize record into fresh memory block: " +
+                                       "Record is too large.");
+               }
+               
+               this.nextElement = next;
+               this.readPhase = false;
+               
+               return true;
+       }
+       
+       /**
+        * Checks, whether the input that is blocked by this iterator, has 
further elements
+        * available. This method may be used to forecast (for example at the 
point where a
+        * block is full) whether there will be more data (possibly in another 
block).
+        * 
+        * @return True, if there will be more data, false otherwise.
+        */
+       public boolean hasFurtherInput() {
+               return !this.noMoreBlocks; 
+       }
+       
+
+       public void close() {
+               // suggest that we are in the read phase. because nothing is in 
the current block,
+               // read requests will fail
+               this.readPhase = true;
+               super.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
new file mode 100644
index 0000000..baa0fb2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.resettable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Implementation of an iterator that fetches a block of data into main memory 
and offers resettable
+ * access to the data in that block.
+ * 
+ */
+public class ReusingBlockResettableIterator<T> extends 
NonReusingBlockResettableIterator<T> {
+
+       public static final Logger LOG = 
LoggerFactory.getLogger(ReusingBlockResettableIterator.class);
+
+       private final T reuseElement;
+
+       // 
------------------------------------------------------------------------
+
+       public ReusingBlockResettableIterator(MemoryManager memoryManager, 
Iterator<T> input,
+                       TypeSerializer<T> serializer, int numPages,
+                       AbstractInvokable ownerTask)
+       throws MemoryAllocationException
+       {
+               this(memoryManager, serializer, numPages, ownerTask);
+               this.input = input;
+       }
+
+       public ReusingBlockResettableIterator(MemoryManager memoryManager, 
TypeSerializer<T>
+                       serializer, int numPages, AbstractInvokable ownerTask)
+       throws MemoryAllocationException
+       {
+               super(memoryManager, serializer, numPages, ownerTask);
+               
+               this.reuseElement = serializer.createInstance();
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public boolean hasNext() {
+               try {
+                       if (this.nextElement == null) {
+                               if (this.readPhase) {
+                                       // read phase, get next element from 
buffer
+                                       T tmp = 
getNextRecord(this.reuseElement);
+                                       if (tmp != null) {
+                                               this.nextElement = tmp;
+                                               return true;
+                                       } else {
+                                               return false;
+                                       }
+                               } else {
+                                       if (this.input.hasNext()) {
+                                               final T next = 
this.input.next();
+                                               if (writeNextRecord(next)) {
+                                                       this.nextElement = next;
+                                                       return true;
+                                               } else {
+                                                       this.leftOverElement = 
next;
+                                                       return false;
+                                               }
+                                       } else {
+                                               this.noMoreBlocks = true;
+                                               return false;
+                                       }
+                               }
+                       } else {
+                               return true;
+                       }
+               } catch (IOException ioex) {
+                       throw new RuntimeException("Error (de)serializing 
record in block resettable iterator.", ioex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
deleted file mode 100644
index 675758a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.BlockResettableIterator;
-import 
org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * An implementation of the {@link JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, 
O> {
-       
-       /**
-        * The log used by this iterator to log messages.
-        */
-       private static final Logger LOG = 
LoggerFactory.getLogger(MergeMatchIterator.class);
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private TypePairComparator<T1, T2> comp;
-       
-       private ReusingKeyGroupedIterator<T1> iterator1;
-
-       private ReusingKeyGroupedIterator<T2> iterator2;
-       
-       private final TypeSerializer<T1> serializer1;
-       
-       private final TypeSerializer<T2> serializer2;
-       
-       private T1 copy1;
-       
-       private T1 spillHeadCopy;
-       
-       private T2 copy2;
-       
-       private T2 blockHeadCopy;
-       
-       private final BlockResettableIterator<T2> blockIt;                      
        // for N:M cross products with same key
-       
-       private final List<MemorySegment> memoryForSpillingIterator;
-       
-       private final MemoryManager memoryManager;
-
-       private final IOManager ioManager;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public MergeMatchIterator(MutableObjectIterator<T1> input1, 
MutableObjectIterator<T2> input2,
-                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
-                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2, TypePairComparator<T1, T2> pairComparator,
-                       MemoryManager memoryManager, IOManager ioManager, int 
numMemoryPages, AbstractInvokable parentTask)
-       throws MemoryAllocationException
-       {
-               if (numMemoryPages < 2) {
-                       throw new IllegalArgumentException("Merger needs at 
least 2 memory pages.");
-               }
-               
-               this.comp = pairComparator;
-               this.serializer1 = serializer1;
-               this.serializer2 = serializer2;
-               
-               this.copy1 = serializer1.createInstance();
-               this.spillHeadCopy = serializer1.createInstance();
-               this.copy2 = serializer2.createInstance();
-               this.blockHeadCopy = serializer2.createInstance();
-               
-               this.memoryManager = memoryManager;
-               this.ioManager = ioManager;
-               
-               this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, 
this.serializer1, comparator1.duplicate());
-               this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, 
this.serializer2, comparator2.duplicate());
-               
-               final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
-               this.blockIt = new 
BlockResettableIterator<T2>(this.memoryManager, this.serializer2,
-                       (numMemoryPages - numPagesForSpiller), parentTask);
-               this.memoryForSpillingIterator = 
memoryManager.allocatePages(parentTask, numPagesForSpiller);
-       }
-
-
-       @Override
-       public void open() throws IOException {}
-
-
-       @Override
-       public void close() {
-               if (this.blockIt != null) {
-                       try {
-                               this.blockIt.close();
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Error closing block memory iterator: 
" + t.getMessage(), t);
-                       }
-               }
-               
-               this.memoryManager.release(this.memoryForSpillingIterator);
-       }
-       
-
-       @Override
-       public void abort() {
-               close();
-       }
-
-       /**
-        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come 
-        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
-        * <p>
-        * This method first zig-zags between the two sorted inputs in order to 
find a common
-        * key, and then calls the match stub with the cross product of the 
values.
-        * 
-        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
-        * 
-        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction,
 Collector)
-        */
-       @Override
-       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
-       throws Exception
-       {
-               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
-                       while (this.iterator1.nextKey());
-                       while (this.iterator2.nextKey());
-                       
-                       return false;
-               }
-
-               final TypePairComparator<T1, T2> comparator = this.comp;
-               comparator.setReference(this.iterator1.getCurrent());
-               T2 current2 = this.iterator2.getCurrent();
-                               
-               // zig zag
-               while (true) {
-                       // determine the relation between the (possibly 
composite) keys
-                       final int comp = 
comparator.compareToReference(current2);
-                       
-                       if (comp == 0) {
-                               break;
-                       }
-                       
-                       if (comp < 0) {
-                               if (!this.iterator2.nextKey()) {
-                                       return false;
-                               }
-                               current2 = this.iterator2.getCurrent();
-                       }
-                       else {
-                               if (!this.iterator1.nextKey()) {
-                                       return false;
-                               }
-                               
comparator.setReference(this.iterator1.getCurrent());
-                       }
-               }
-               
-               // here, we have a common key! call the match function with the 
cross product of the
-               // values
-               final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = 
this.iterator1.getValues();
-               final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = 
this.iterator2.getValues();
-               
-               final T1 firstV1 = values1.next();
-               final T2 firstV2 = values2.next();      
-                       
-               final boolean v1HasNext = values1.hasNext();
-               final boolean v2HasNext = values2.hasNext();
-
-               // check if one side is already empty
-               // this check could be omitted if we put this in MatchTask.
-               // then we can derive the local strategy (with build side).
-               
-               if (v1HasNext) {
-                       if (v2HasNext) {
-                               // both sides contain more than one value
-                               // TODO: Decide which side to spill and which 
to block!
-                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
-                       } else {
-                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
-                       }
-               } else {
-                       if (v2HasNext) {
-                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
-                       } else {
-                               // both sides contain only one value
-                               matchFunction.join(firstV1, firstV2, collector);
-                       }
-               }
-               return true;
-       }
-
-       /**
-        * Crosses a single value from the first input with N values, all 
sharing a common key.
-        * Effectively realizes a <i>1:N</i> match (join).
-        * 
-        * @param val1 The value form the <i>1</i> side.
-        * @param firstValN The first of the values from the <i>N</i> side.
-        * @param valsN Iterator over remaining <i>N</i> side values.
-        *          
-        * @throws Exception Forwards all exceptions thrown by the stub.
-        */
-       private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-                       final Iterator<T2> valsN, final FlatJoinFunction<T1, 
T2, O> matchFunction, final Collector<O> collector)
-       throws Exception
-       {
-               this.copy1 = this.serializer1.copy(val1, this.copy1);
-               matchFunction.join(this.copy1, firstValN, collector);
-               
-               // set copy and match first element
-               boolean more = true;
-               do {
-                       final T2 nRec = valsN.next();
-                       
-                       if (valsN.hasNext()) {
-                               this.copy1 = this.serializer1.copy(val1, 
this.copy1);
-                               matchFunction.join(this.copy1, nRec, collector);
-                       } else {
-                               matchFunction.join(val1, nRec, collector);
-                               more = false;
-                       }
-               }
-               while (more);
-       }
-       
-       /**
-        * Crosses a single value from the second side with N values, all 
sharing a common key.
-        * Effectively realizes a <i>N:1</i> match (join).
-        * 
-        * @param val1 The value form the <i>1</i> side.
-        * @param firstValN The first of the values from the <i>N</i> side.
-        * @param valsN Iterator over remaining <i>N</i> side values.
-        *          
-        * @throws Exception Forwards all exceptions thrown by the stub.
-        */
-       private void crossSecond1withNValues(T2 val1, T1 firstValN,
-                       Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> 
matchFunction, Collector<O> collector)
-       throws Exception
-       {
-               this.copy2 = this.serializer2.copy(val1, this.copy2);
-               matchFunction.join(firstValN, this.copy2, collector);
-               
-               // set copy and match first element
-               boolean more = true;
-               do {
-                       final T1 nRec = valsN.next();
-                       
-                       if (valsN.hasNext()) {
-                               this.copy2 = this.serializer2.copy(val1, 
this.copy2);
-                               matchFunction.join(nRec,this.copy2,collector);
-                       } else {
-                               matchFunction.join(nRec, val1, collector);
-                               more = false;
-                       }
-               }
-               while (more);
-       }
-       
-       /**
-        * @param firstV1
-        * @param spillVals
-        * @param firstV2
-        * @param blockVals
-        */
-       private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
-                       final T2 firstV2, final Iterator<T2> blockVals,
-                       final FlatJoinFunction<T1, T2, O> matchFunction, final 
Collector<O> collector)
-       throws Exception
-       {
-               // ==================================================
-               // We have one first (head) element from both inputs (firstV1 
and firstV2)
-               // We have an iterator for both inputs.
-               // we make the V1 side the spilling side and the V2 side the 
blocking side.
-               // In order to get the full cross product without unnecessary 
spilling, we do the
-               // following:
-               // 1) cross the heads
-               // 2) cross the head of the spilling side against the first 
block of the blocking side
-               // 3) cross the iterator of the spilling side with the head of 
the block side
-               // 4) cross the iterator of the spilling side with the first 
block
-               // ---------------------------------------------------
-               // If the blocking side has more than one block, we really need 
to make the spilling side fully
-               // resettable. For each further block on the block side, we do:
-               // 5) cross the head of the spilling side with the next block
-               // 6) cross the spilling iterator with the next block.
-               
-               // match the first values first
-               this.copy1 = this.serializer1.copy(firstV1, this.copy1);
-               this.blockHeadCopy = this.serializer2.copy(firstV2, 
this.blockHeadCopy);
-               
-               // --------------- 1) Cross the heads -------------------
-               matchFunction.join(this.copy1, firstV2, collector);
-               
-               // for the remaining values, we do a block-nested-loops join
-               SpillingResettableIterator<T1> spillIt = null;
-               
-               try {
-                       // create block iterator on the second input
-                       this.blockIt.reopen(blockVals);
-                       
-                       // ------------- 2) cross the head of the spilling side 
with the first block ------------------
-                       while (this.blockIt.hasNext()) {
-                               final T2 nextBlockRec = this.blockIt.next();
-                               this.copy1 = this.serializer1.copy(firstV1, 
this.copy1);
-                               matchFunction.join(this.copy1, nextBlockRec, 
collector);
-                       }
-                       this.blockIt.reset();
-                       
-                       // spilling is required if the blocked input has data 
beyond the current block.
-                       // in that case, create the spilling iterator
-                       final Iterator<T1> leftSideIter;
-                       final boolean spillingRequired = 
this.blockIt.hasFurtherInput();
-                       if (spillingRequired)
-                       {
-                               // more data than would fit into one block. we 
need to wrap the other side in a spilling iterator
-                               // create spilling iterator on first input
-                               spillIt = new 
SpillingResettableIterator<T1>(spillVals, this.serializer1,
-                                               this.memoryManager, 
this.ioManager, this.memoryForSpillingIterator);
-                               leftSideIter = spillIt;
-                               spillIt.open();
-                               
-                               this.spillHeadCopy = 
this.serializer1.copy(firstV1, this.spillHeadCopy);
-                       }
-                       else {
-                               leftSideIter = spillVals;
-                       }
-                       
-                       // cross the values in the v1 iterator against the 
current block
-                       
-                       while (leftSideIter.hasNext()) {
-                               final T1 nextSpillVal = leftSideIter.next();
-                               this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                               
-                               
-                               // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
-                               this.copy2 = 
this.serializer2.copy(this.blockHeadCopy, this.copy2);
-                               matchFunction.join(this.copy1, this.copy2, 
collector);
-                               
-                               // -------- 4) cross the iterator of the 
spilling side with the first block --------
-                               while (this.blockIt.hasNext()) {
-                                       T2 nextBlockRec = this.blockIt.next();
-                                       
-                                       // get instances of key and block value
-                                       this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                                       matchFunction.join(this.copy1, 
nextBlockRec, collector);
-                               }
-                               // reset block iterator
-                               this.blockIt.reset();
-                       }
-                       
-                       // if everything from the block-side fit into a single 
block, we are done.
-                       // note that in this special case, we did not create a 
spilling iterator at all
-                       if (!spillingRequired) {
-                               return;
-                       }
-                       
-                       // here we are, because we have more blocks on the 
block side
-                       // loop as long as there are blocks from the blocked 
input
-                       while (this.blockIt.nextBlock())
-                       {
-                               // rewind the spilling iterator
-                               spillIt.reset();
-                               
-                               // ------------- 5) cross the head of the 
spilling side with the next block ------------
-                               while (this.blockIt.hasNext()) {
-                                       this.copy1 = 
this.serializer1.copy(this.spillHeadCopy, this.copy1);
-                                       final T2 nextBlockVal = blockIt.next();
-                                       matchFunction.join(this.copy1, 
nextBlockVal, collector);
-                               }
-                               this.blockIt.reset();
-                               
-                               // -------- 6) cross the spilling iterator with 
the next block. ------------------
-                               while (spillIt.hasNext())
-                               {
-                                       // get value from resettable iterator
-                                       final T1 nextSpillVal = spillIt.next();
-                                       // cross value with block values
-                                       while (this.blockIt.hasNext()) {
-                                               // get instances of key and 
block value
-                                               final T2 nextBlockVal = 
this.blockIt.next();
-                                               this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                                               matchFunction.join(this.copy1, 
nextBlockVal, collector);
-                                       }
-                                       
-                                       // reset block iterator
-                                       this.blockIt.reset();
-                               }
-                               // reset v1 iterator
-                               spillIt.reset();
-                       }
-               }
-               finally {
-                       if (spillIt != null) {
-                               
this.memoryForSpillingIterator.addAll(spillIt.close());
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
new file mode 100644
index 0000000..70b6f9a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
+import 
org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * matching through a sort-merge join strategy.
+ */
+public class NonReusingMergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O> {
+
+       /**
+        * The log used by this iterator to log messages.
+        */
+       private static final Logger LOG = 
LoggerFactory.getLogger(NonReusingMergeMatchIterator.class);
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private TypePairComparator<T1, T2> comp;
+
+       private NonReusingKeyGroupedIterator<T1> iterator1;
+
+       private NonReusingKeyGroupedIterator<T2> iterator2;
+
+       private final TypeSerializer<T1> serializer1;
+
+       private final TypeSerializer<T2> serializer2;
+
+       private final NonReusingBlockResettableIterator<T2> blockIt;            
                // for N:M cross products with same key
+
+       private final List<MemorySegment> memoryForSpillingIterator;
+
+       private final MemoryManager memoryManager;
+
+       private final IOManager ioManager;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public NonReusingMergeMatchIterator(
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+       throws MemoryAllocationException
+       {
+               if (numMemoryPages < 2) {
+                       throw new IllegalArgumentException("Merger needs at 
least 2 memory pages.");
+               }
+
+               this.comp = pairComparator;
+               this.serializer1 = serializer1;
+               this.serializer2 = serializer2;
+
+               this.memoryManager = memoryManager;
+               this.ioManager = ioManager;
+
+               this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, 
this.serializer1, comparator1.duplicate());
+               this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, 
this.serializer2, comparator2.duplicate());
+
+               final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
+               this.blockIt = new 
NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
+                       (numMemoryPages - numPagesForSpiller), parentTask);
+               this.memoryForSpillingIterator = 
memoryManager.allocatePages(parentTask, numPagesForSpiller);
+       }
+
+
+       @Override
+       public void open() throws IOException {}
+
+
+       @Override
+       public void close() {
+               if (this.blockIt != null) {
+                       try {
+                               this.blockIt.close();
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Error closing block memory iterator: 
" + t.getMessage(), t);
+                       }
+               }
+
+               this.memoryManager.release(this.memoryForSpillingIterator);
+       }
+
+
+       @Override
+       public void abort() {
+               close();
+       }
+
+       /**
+        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come
+        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
+        * <p>
+        * This method first zig-zags between the two sorted inputs in order to 
find a common
+        * key, and then calls the match stub with the cross product of the 
values.
+        *
+        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+        *
+        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+        */
+       @Override
+       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
+       throws Exception
+       {
+               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
+                       while (this.iterator1.nextKey());
+                       while (this.iterator2.nextKey());
+                       
+                       return false;
+               }
+
+               final TypePairComparator<T1, T2> comparator = this.comp;
+               comparator.setReference(this.iterator1.getCurrent());
+               T2 current2 = this.iterator2.getCurrent();
+                               
+               // zig zag
+               while (true) {
+                       // determine the relation between the (possibly 
composite) keys
+                       final int comp = 
comparator.compareToReference(current2);
+                       
+                       if (comp == 0) {
+                               break;
+                       }
+                       
+                       if (comp < 0) {
+                               if (!this.iterator2.nextKey()) {
+                                       return false;
+                               }
+                               current2 = this.iterator2.getCurrent();
+                       }
+                       else {
+                               if (!this.iterator1.nextKey()) {
+                                       return false;
+                               }
+                               
comparator.setReference(this.iterator1.getCurrent());
+                       }
+               }
+               
+               // here, we have a common key! call the match function with the 
cross product of the
+               // values
+               final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 = 
this.iterator1.getValues();
+               final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 = 
this.iterator2.getValues();
+               
+               final T1 firstV1 = values1.next();
+               final T2 firstV2 = values2.next();      
+                       
+               final boolean v1HasNext = values1.hasNext();
+               final boolean v2HasNext = values2.hasNext();
+
+               // check if one side is already empty
+               // this check could be omitted if we put this in MatchTask.
+               // then we can derive the local strategy (with build side).
+               
+               if (v1HasNext) {
+                       if (v2HasNext) {
+                               // both sides contain more than one value
+                               // TODO: Decide which side to spill and which 
to block!
+                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
+                       } else {
+                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
+                       }
+               } else {
+                       if (v2HasNext) {
+                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
+                       } else {
+                               // both sides contain only one value
+                               matchFunction.join(firstV1, firstV2, collector);
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * Crosses a single value from the first input with N values, all 
sharing a common key.
+        * Effectively realizes a <i>1:N</i> match (join).
+        * 
+        * @param val1 The value form the <i>1</i> side.
+        * @param firstValN The first of the values from the <i>N</i> side.
+        * @param valsN Iterator over remaining <i>N</i> side values.
+        *          
+        * @throws Exception Forwards all exceptions thrown by the stub.
+        */
+       private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
+                       final Iterator<T2> valsN, final FlatJoinFunction<T1, 
T2, O> matchFunction, final Collector<O> collector)
+       throws Exception
+       {
+               T1 copy1 = this.serializer1.copy(val1);
+               matchFunction.join(copy1, firstValN, collector);
+               
+               // set copy and match first element
+               boolean more = true;
+               do {
+                       final T2 nRec = valsN.next();
+                       
+                       if (valsN.hasNext()) {
+                               copy1 = this.serializer1.copy(val1);
+                               matchFunction.join(copy1, nRec, collector);
+                       } else {
+                               matchFunction.join(val1, nRec, collector);
+                               more = false;
+                       }
+               }
+               while (more);
+       }
+       
+       /**
+        * Crosses a single value from the second side with N values, all 
sharing a common key.
+        * Effectively realizes a <i>N:1</i> match (join).
+        * 
+        * @param val1 The value form the <i>1</i> side.
+        * @param firstValN The first of the values from the <i>N</i> side.
+        * @param valsN Iterator over remaining <i>N</i> side values.
+        *          
+        * @throws Exception Forwards all exceptions thrown by the stub.
+        */
+       private void crossSecond1withNValues(T2 val1, T1 firstValN,
+                       Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> 
matchFunction, Collector<O> collector)
+       throws Exception
+       {
+               T2 copy2 = this.serializer2.copy(val1);
+               matchFunction.join(firstValN, copy2, collector);
+               
+               // set copy and match first element
+               boolean more = true;
+               do {
+                       final T1 nRec = valsN.next();
+                       
+                       if (valsN.hasNext()) {
+                               copy2 = this.serializer2.copy(val1);
+                               matchFunction.join(nRec, copy2, collector);
+                       } else {
+                               matchFunction.join(nRec, val1, collector);
+                               more = false;
+                       }
+               }
+               while (more);
+       }
+       
+       /**
+        * @param firstV1
+        * @param spillVals
+        * @param firstV2
+        * @param blockVals
+        */
+       private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
+                       final T2 firstV2, final Iterator<T2> blockVals,
+                       final FlatJoinFunction<T1, T2, O> matchFunction, final 
Collector<O> collector)
+       throws Exception
+       {
+               // ==================================================
+               // We have one first (head) element from both inputs (firstV1 
and firstV2)
+               // We have an iterator for both inputs.
+               // we make the V1 side the spilling side and the V2 side the 
blocking side.
+               // In order to get the full cross product without unnecessary 
spilling, we do the
+               // following:
+               // 1) cross the heads
+               // 2) cross the head of the spilling side against the first 
block of the blocking side
+               // 3) cross the iterator of the spilling side with the head of 
the block side
+               // 4) cross the iterator of the spilling side with the first 
block
+               // ---------------------------------------------------
+               // If the blocking side has more than one block, we really need 
to make the spilling side fully
+               // resettable. For each further block on the block side, we do:
+               // 5) cross the head of the spilling side with the next block
+               // 6) cross the spilling iterator with the next block.
+               
+               // match the first values first
+               T1 copy1 = this.serializer1.copy(firstV1);
+               T2 blockHeadCopy = this.serializer2.copy(firstV2);
+               T1 spillHeadCopy = null;
+
+
+               // --------------- 1) Cross the heads -------------------
+               matchFunction.join(copy1, firstV2, collector);
+               
+               // for the remaining values, we do a block-nested-loops join
+               SpillingResettableIterator<T1> spillIt = null;
+               
+               try {
+                       // create block iterator on the second input
+                       this.blockIt.reopen(blockVals);
+                       
+                       // ------------- 2) cross the head of the spilling side 
with the first block ------------------
+                       while (this.blockIt.hasNext()) {
+                               final T2 nextBlockRec = this.blockIt.next();
+                               copy1 = this.serializer1.copy(firstV1);
+                               matchFunction.join(copy1, nextBlockRec, 
collector);
+                       }
+                       this.blockIt.reset();
+                       
+                       // spilling is required if the blocked input has data 
beyond the current block.
+                       // in that case, create the spilling iterator
+                       final Iterator<T1> leftSideIter;
+                       final boolean spillingRequired = 
this.blockIt.hasFurtherInput();
+                       if (spillingRequired)
+                       {
+                               // more data than would fit into one block. we 
need to wrap the other side in a spilling iterator
+                               // create spilling iterator on first input
+                               spillIt = new 
SpillingResettableIterator<T1>(spillVals, this.serializer1,
+                                               this.memoryManager, 
this.ioManager, this.memoryForSpillingIterator);
+                               leftSideIter = spillIt;
+                               spillIt.open();
+                               
+                               spillHeadCopy = this.serializer1.copy(firstV1);
+                       }
+                       else {
+                               leftSideIter = spillVals;
+                       }
+                       
+                       // cross the values in the v1 iterator against the 
current block
+                       
+                       while (leftSideIter.hasNext()) {
+                               final T1 nextSpillVal = leftSideIter.next();
+                               copy1 = this.serializer1.copy(nextSpillVal);
+                               
+                               
+                               // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
+                               T2 copy2 = this.serializer2.copy(blockHeadCopy);
+                               matchFunction.join(copy1, copy2, collector);
+                               
+                               // -------- 4) cross the iterator of the 
spilling side with the first block --------
+                               while (this.blockIt.hasNext()) {
+                                       T2 nextBlockRec = this.blockIt.next();
+                                       
+                                       // get instances of key and block value
+                                       copy1 = 
this.serializer1.copy(nextSpillVal);
+                                       matchFunction.join(copy1, nextBlockRec, 
collector);
+                               }
+                               // reset block iterator
+                               this.blockIt.reset();
+                       }
+                       
+                       // if everything from the block-side fit into a single 
block, we are done.
+                       // note that in this special case, we did not create a 
spilling iterator at all
+                       if (!spillingRequired) {
+                               return;
+                       }
+                       
+                       // here we are, because we have more blocks on the 
block side
+                       // loop as long as there are blocks from the blocked 
input
+                       while (this.blockIt.nextBlock())
+                       {
+                               // rewind the spilling iterator
+                               spillIt.reset();
+                               
+                               // ------------- 5) cross the head of the 
spilling side with the next block ------------
+                               while (this.blockIt.hasNext()) {
+                                       copy1 = 
this.serializer1.copy(spillHeadCopy);
+                                       final T2 nextBlockVal = blockIt.next();
+                                       matchFunction.join(copy1, nextBlockVal, 
collector);
+                               }
+                               this.blockIt.reset();
+                               
+                               // -------- 6) cross the spilling iterator with 
the next block. ------------------
+                               while (spillIt.hasNext())
+                               {
+                                       // get value from resettable iterator
+                                       final T1 nextSpillVal = spillIt.next();
+                                       // cross value with block values
+                                       while (this.blockIt.hasNext()) {
+                                               // get instances of key and 
block value
+                                               final T2 nextBlockVal = 
this.blockIt.next();
+                                               copy1 = 
this.serializer1.copy(nextSpillVal);
+                                               matchFunction.join(copy1, 
nextBlockVal, collector);
+                                       }
+                                       
+                                       // reset block iterator
+                                       this.blockIt.reset();
+                               }
+                               // reset v1 iterator
+                               spillIt.reset();
+                       }
+               }
+               finally {
+                       if (spillIt != null) {
+                               
this.memoryForSpillingIterator.addAll(spillIt.close());
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
new file mode 100644
index 0000000..66beee1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
+import 
org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * An implementation of the {@link JoinTaskIterator} that realizes the
+ * matching through a sort-merge join strategy.
+ */
+public class ReusingMergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O> {
+       
+       /**
+        * The log used by this iterator to log messages.
+        */
+       private static final Logger LOG = 
LoggerFactory.getLogger(ReusingMergeMatchIterator.class);
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private TypePairComparator<T1, T2> comp;
+       
+       private ReusingKeyGroupedIterator<T1> iterator1;
+
+       private ReusingKeyGroupedIterator<T2> iterator2;
+       
+       private final TypeSerializer<T1> serializer1;
+       
+       private final TypeSerializer<T2> serializer2;
+       
+       private T1 copy1;
+       
+       private T1 spillHeadCopy;
+       
+       private T2 copy2;
+       
+       private T2 blockHeadCopy;
+       
+       private final NonReusingBlockResettableIterator<T2> blockIt;            
                // for N:M cross products with same key
+       
+       private final List<MemorySegment> memoryForSpillingIterator;
+       
+       private final MemoryManager memoryManager;
+
+       private final IOManager ioManager;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public ReusingMergeMatchIterator(
+                       MutableObjectIterator<T1> input1,
+                       MutableObjectIterator<T2> input2,
+                       TypeSerializer<T1> serializer1, TypeComparator<T1> 
comparator1,
+                       TypeSerializer<T2> serializer2, TypeComparator<T2> 
comparator2,
+                       TypePairComparator<T1, T2> pairComparator,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       int numMemoryPages,
+                       AbstractInvokable parentTask)
+       throws MemoryAllocationException
+       {
+               if (numMemoryPages < 2) {
+                       throw new IllegalArgumentException("Merger needs at 
least 2 memory pages.");
+               }
+               
+               this.comp = pairComparator;
+               this.serializer1 = serializer1;
+               this.serializer2 = serializer2;
+               
+               this.copy1 = serializer1.createInstance();
+               this.spillHeadCopy = serializer1.createInstance();
+               this.copy2 = serializer2.createInstance();
+               this.blockHeadCopy = serializer2.createInstance();
+               
+               this.memoryManager = memoryManager;
+               this.ioManager = ioManager;
+               
+               this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, 
this.serializer1, comparator1.duplicate());
+               this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, 
this.serializer2, comparator2.duplicate());
+               
+               final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
+               this.blockIt = new 
NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
+                       (numMemoryPages - numPagesForSpiller), parentTask);
+               this.memoryForSpillingIterator = 
memoryManager.allocatePages(parentTask, numPagesForSpiller);
+       }
+
+
+       @Override
+       public void open() throws IOException {}
+
+
+       @Override
+       public void close() {
+               if (this.blockIt != null) {
+                       try {
+                               this.blockIt.close();
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Error closing block memory iterator: 
" + t.getMessage(), t);
+                       }
+               }
+               
+               this.memoryManager.release(this.memoryForSpillingIterator);
+       }
+       
+
+       @Override
+       public void abort() {
+               close();
+       }
+
+       /**
+        * Calls the <code>JoinFunction#match()</code> method for all two 
key-value pairs that share the same key and come 
+        * from different inputs. The output of the <code>match()</code> method 
is forwarded.
+        * <p>
+        * This method first zig-zags between the two sorted inputs in order to 
find a common
+        * key, and then calls the match stub with the cross product of the 
values.
+        * 
+        * @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+        * 
+        * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction,
 Collector)
+        */
+       @Override
+       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
+       throws Exception
+       {
+               if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+                       // consume all remaining keys (hack to prevent 
remaining inputs during iterations, lets get rid of this soon)
+                       while (this.iterator1.nextKey());
+                       while (this.iterator2.nextKey());
+                       
+                       return false;
+               }
+
+               final TypePairComparator<T1, T2> comparator = this.comp;
+               comparator.setReference(this.iterator1.getCurrent());
+               T2 current2 = this.iterator2.getCurrent();
+                               
+               // zig zag
+               while (true) {
+                       // determine the relation between the (possibly 
composite) keys
+                       final int comp = 
comparator.compareToReference(current2);
+                       
+                       if (comp == 0) {
+                               break;
+                       }
+                       
+                       if (comp < 0) {
+                               if (!this.iterator2.nextKey()) {
+                                       return false;
+                               }
+                               current2 = this.iterator2.getCurrent();
+                       }
+                       else {
+                               if (!this.iterator1.nextKey()) {
+                                       return false;
+                               }
+                               
comparator.setReference(this.iterator1.getCurrent());
+                       }
+               }
+               
+               // here, we have a common key! call the match function with the 
cross product of the
+               // values
+               final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = 
this.iterator1.getValues();
+               final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = 
this.iterator2.getValues();
+               
+               final T1 firstV1 = values1.next();
+               final T2 firstV2 = values2.next();      
+                       
+               final boolean v1HasNext = values1.hasNext();
+               final boolean v2HasNext = values2.hasNext();
+
+               // check if one side is already empty
+               // this check could be omitted if we put this in MatchTask.
+               // then we can derive the local strategy (with build side).
+               
+               if (v1HasNext) {
+                       if (v2HasNext) {
+                               // both sides contain more than one value
+                               // TODO: Decide which side to spill and which 
to block!
+                               crossMwithNValues(firstV1, values1, firstV2, 
values2, matchFunction, collector);
+                       } else {
+                               crossSecond1withNValues(firstV2, firstV1, 
values1, matchFunction, collector);
+                       }
+               } else {
+                       if (v2HasNext) {
+                               crossFirst1withNValues(firstV1, firstV2, 
values2, matchFunction, collector);
+                       } else {
+                               // both sides contain only one value
+                               matchFunction.join(firstV1, firstV2, collector);
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * Crosses a single value from the first input with N values, all 
sharing a common key.
+        * Effectively realizes a <i>1:N</i> match (join).
+        * 
+        * @param val1 The value form the <i>1</i> side.
+        * @param firstValN The first of the values from the <i>N</i> side.
+        * @param valsN Iterator over remaining <i>N</i> side values.
+        *          
+        * @throws Exception Forwards all exceptions thrown by the stub.
+        */
+       private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
+                       final Iterator<T2> valsN, final FlatJoinFunction<T1, 
T2, O> matchFunction, final Collector<O> collector)
+       throws Exception
+       {
+               this.copy1 = this.serializer1.copy(val1, this.copy1);
+               matchFunction.join(this.copy1, firstValN, collector);
+               
+               // set copy and match first element
+               boolean more = true;
+               do {
+                       final T2 nRec = valsN.next();
+                       
+                       if (valsN.hasNext()) {
+                               this.copy1 = this.serializer1.copy(val1, 
this.copy1);
+                               matchFunction.join(this.copy1, nRec, collector);
+                       } else {
+                               matchFunction.join(val1, nRec, collector);
+                               more = false;
+                       }
+               }
+               while (more);
+       }
+       
+       /**
+        * Crosses a single value from the second side with N values, all 
sharing a common key.
+        * Effectively realizes a <i>N:1</i> match (join).
+        * 
+        * @param val1 The value form the <i>1</i> side.
+        * @param firstValN The first of the values from the <i>N</i> side.
+        * @param valsN Iterator over remaining <i>N</i> side values.
+        *          
+        * @throws Exception Forwards all exceptions thrown by the stub.
+        */
+       private void crossSecond1withNValues(T2 val1, T1 firstValN,
+                       Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> 
matchFunction, Collector<O> collector)
+       throws Exception
+       {
+               this.copy2 = this.serializer2.copy(val1, this.copy2);
+               matchFunction.join(firstValN, this.copy2, collector);
+               
+               // set copy and match first element
+               boolean more = true;
+               do {
+                       final T1 nRec = valsN.next();
+                       
+                       if (valsN.hasNext()) {
+                               this.copy2 = this.serializer2.copy(val1, 
this.copy2);
+                               matchFunction.join(nRec,this.copy2,collector);
+                       } else {
+                               matchFunction.join(nRec, val1, collector);
+                               more = false;
+                       }
+               }
+               while (more);
+       }
+       
+       /**
+        * @param firstV1
+        * @param spillVals
+        * @param firstV2
+        * @param blockVals
+        */
+       private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
+                       final T2 firstV2, final Iterator<T2> blockVals,
+                       final FlatJoinFunction<T1, T2, O> matchFunction, final 
Collector<O> collector)
+       throws Exception
+       {
+               // ==================================================
+               // We have one first (head) element from both inputs (firstV1 
and firstV2)
+               // We have an iterator for both inputs.
+               // we make the V1 side the spilling side and the V2 side the 
blocking side.
+               // In order to get the full cross product without unnecessary 
spilling, we do the
+               // following:
+               // 1) cross the heads
+               // 2) cross the head of the spilling side against the first 
block of the blocking side
+               // 3) cross the iterator of the spilling side with the head of 
the block side
+               // 4) cross the iterator of the spilling side with the first 
block
+               // ---------------------------------------------------
+               // If the blocking side has more than one block, we really need 
to make the spilling side fully
+               // resettable. For each further block on the block side, we do:
+               // 5) cross the head of the spilling side with the next block
+               // 6) cross the spilling iterator with the next block.
+               
+               // match the first values first
+               this.copy1 = this.serializer1.copy(firstV1, this.copy1);
+               this.blockHeadCopy = this.serializer2.copy(firstV2, 
this.blockHeadCopy);
+               
+               // --------------- 1) Cross the heads -------------------
+               matchFunction.join(this.copy1, firstV2, collector);
+               
+               // for the remaining values, we do a block-nested-loops join
+               SpillingResettableIterator<T1> spillIt = null;
+               
+               try {
+                       // create block iterator on the second input
+                       this.blockIt.reopen(blockVals);
+                       
+                       // ------------- 2) cross the head of the spilling side 
with the first block ------------------
+                       while (this.blockIt.hasNext()) {
+                               final T2 nextBlockRec = this.blockIt.next();
+                               this.copy1 = this.serializer1.copy(firstV1, 
this.copy1);
+                               matchFunction.join(this.copy1, nextBlockRec, 
collector);
+                       }
+                       this.blockIt.reset();
+                       
+                       // spilling is required if the blocked input has data 
beyond the current block.
+                       // in that case, create the spilling iterator
+                       final Iterator<T1> leftSideIter;
+                       final boolean spillingRequired = 
this.blockIt.hasFurtherInput();
+                       if (spillingRequired)
+                       {
+                               // more data than would fit into one block. we 
need to wrap the other side in a spilling iterator
+                               // create spilling iterator on first input
+                               spillIt = new 
SpillingResettableIterator<T1>(spillVals, this.serializer1,
+                                               this.memoryManager, 
this.ioManager, this.memoryForSpillingIterator);
+                               leftSideIter = spillIt;
+                               spillIt.open();
+                               
+                               this.spillHeadCopy = 
this.serializer1.copy(firstV1, this.spillHeadCopy);
+                       }
+                       else {
+                               leftSideIter = spillVals;
+                       }
+                       
+                       // cross the values in the v1 iterator against the 
current block
+                       
+                       while (leftSideIter.hasNext()) {
+                               final T1 nextSpillVal = leftSideIter.next();
+                               this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
+                               
+                               
+                               // -------- 3) cross the iterator of the 
spilling side with the head of the block side --------
+                               this.copy2 = 
this.serializer2.copy(this.blockHeadCopy, this.copy2);
+                               matchFunction.join(this.copy1, this.copy2, 
collector);
+                               
+                               // -------- 4) cross the iterator of the 
spilling side with the first block --------
+                               while (this.blockIt.hasNext()) {
+                                       T2 nextBlockRec = this.blockIt.next();
+                                       
+                                       // get instances of key and block value
+                                       this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
+                                       matchFunction.join(this.copy1, 
nextBlockRec, collector);
+                               }
+                               // reset block iterator
+                               this.blockIt.reset();
+                       }
+                       
+                       // if everything from the block-side fit into a single 
block, we are done.
+                       // note that in this special case, we did not create a 
spilling iterator at all
+                       if (!spillingRequired) {
+                               return;
+                       }
+                       
+                       // here we are, because we have more blocks on the 
block side
+                       // loop as long as there are blocks from the blocked 
input
+                       while (this.blockIt.nextBlock())
+                       {
+                               // rewind the spilling iterator
+                               spillIt.reset();
+                               
+                               // ------------- 5) cross the head of the 
spilling side with the next block ------------
+                               while (this.blockIt.hasNext()) {
+                                       this.copy1 = 
this.serializer1.copy(this.spillHeadCopy, this.copy1);
+                                       final T2 nextBlockVal = blockIt.next();
+                                       matchFunction.join(this.copy1, 
nextBlockVal, collector);
+                               }
+                               this.blockIt.reset();
+                               
+                               // -------- 6) cross the spilling iterator with 
the next block. ------------------
+                               while (spillIt.hasNext())
+                               {
+                                       // get value from resettable iterator
+                                       final T1 nextSpillVal = spillIt.next();
+                                       // cross value with block values
+                                       while (this.blockIt.hasNext()) {
+                                               // get instances of key and 
block value
+                                               final T2 nextBlockVal = 
this.blockIt.next();
+                                               this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
+                                               matchFunction.join(this.copy1, 
nextBlockVal, collector);
+                                       }
+                                       
+                                       // reset block iterator
+                                       this.blockIt.reset();
+                               }
+                               // reset v1 iterator
+                               spillIt.reset();
+                       }
+               }
+               finally {
+                       if (spillIt != null) {
+                               
this.memoryForSpillingIterator.addAll(spillIt.close());
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
deleted file mode 100644
index c51e53a..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.resettable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.BlockResettableIterator;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class BlockResettableIteratorTest
-{
-       private static final int MEMORY_CAPACITY = 3 * 128 * 1024;
-       
-       private static final int NUM_VALUES = 20000;
-       
-       private MemoryManager memman;
-
-       private Iterator<Record> reader;
-
-       private List<Record> objects;
-       
-       private final TypeSerializer<Record> serializer = 
RecordSerializer.get();
-
-       @Before
-       public void startup() {
-               // set up IO and memory manager
-               this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
-               
-               // create test objects
-               this.objects = new ArrayList<Record>(20000);
-               for (int i = 0; i < NUM_VALUES; ++i) {
-                       this.objects.add(new Record(new IntValue(i)));
-               }
-               
-               // create the reader
-               this.reader = objects.iterator();
-       }
-       
-       @After
-       public void shutdown() {
-               this.objects = null;
-               
-               // check that the memory manager got all segments back
-               if (!this.memman.verifyEmpty()) {
-                       Assert.fail("A memory leak has occurred: Not all memory 
was properly returned to the memory manager.");
-               }
-               
-               this.memman.shutdown();
-               this.memman = null;
-       }
-
-       @Test
-       public void testSerialBlockResettableIterator() throws Exception
-       {
-               final AbstractInvokable memOwner = new DummyInvokable();
-               // create the resettable Iterator
-               final BlockResettableIterator<Record> iterator = new 
BlockResettableIterator<Record>(
-                               this.memman, this.reader, this.serializer, 1, 
memOwner);
-               // open the iterator
-               iterator.open();
-               
-               // now test walking through the iterator
-               int lower = 0;
-               int upper = 0;
-               do {
-                       lower = upper;
-                       upper = lower;
-                       // find the upper bound
-                       while (iterator.hasNext()) {
-                               Record target = iterator.next();
-                               int val = target.getField(0, 
IntValue.class).getValue();
-                               Assert.assertEquals(upper++, val);
-                       }
-                       // now reset the buffer a few times
-                       for (int i = 0; i < 5; ++i) {
-                               iterator.reset();
-                               int count = 0;
-                               while (iterator.hasNext()) {
-                                       Record target = iterator.next();
-                                       int val = target.getField(0, 
IntValue.class).getValue();
-                                       Assert.assertEquals(lower + (count++), 
val);
-                               }
-                               Assert.assertEquals(upper - lower, count);
-                       }
-               } while (iterator.nextBlock());
-               Assert.assertEquals(NUM_VALUES, upper);
-               // close the iterator
-               iterator.close();
-       }
-
-       @Test
-       public void testDoubleBufferedBlockResettableIterator() throws Exception
-       {
-               final AbstractInvokable memOwner = new DummyInvokable();
-               // create the resettable Iterator
-               final BlockResettableIterator<Record> iterator = new 
BlockResettableIterator<Record>(
-                               this.memman, this.reader, this.serializer, 2, 
memOwner);
-               // open the iterator
-               iterator.open();
-               
-               // now test walking through the iterator
-               int lower = 0;
-               int upper = 0;
-               do {
-                       lower = upper;
-                       upper = lower;
-                       // find the upper bound
-                       while (iterator.hasNext()) {
-                               Record target = iterator.next();
-                               int val = target.getField(0, 
IntValue.class).getValue();
-                               Assert.assertEquals(upper++, val);
-                       }
-                       // now reset the buffer a few times
-                       for (int i = 0; i < 5; ++i) {
-                               iterator.reset();
-                               int count = 0;
-                               while (iterator.hasNext()) {
-                                       Record target = iterator.next();
-                                       int val = target.getField(0, 
IntValue.class).getValue();
-                                       Assert.assertEquals(lower + (count++), 
val);
-                               }
-                               Assert.assertEquals(upper - lower, count);
-                       }
-               } while (iterator.nextBlock());
-               Assert.assertEquals(NUM_VALUES, upper);
-               
-               // close the iterator
-               iterator.close();
-       }
-
-       @Test
-       public void testTwelveFoldBufferedBlockResettableIterator() throws 
Exception
-       {
-               final AbstractInvokable memOwner = new DummyInvokable();
-               // create the resettable Iterator
-               final BlockResettableIterator<Record> iterator = new 
BlockResettableIterator<Record>(
-                               this.memman, this.reader, this.serializer, 12, 
memOwner);
-               // open the iterator
-               iterator.open();
-               
-               // now test walking through the iterator
-               int lower = 0;
-               int upper = 0;
-               do {
-                       lower = upper;
-                       upper = lower;
-                       // find the upper bound
-                       while (iterator.hasNext()) {
-                               Record target = iterator.next();
-                               int val = target.getField(0, 
IntValue.class).getValue();
-                               Assert.assertEquals(upper++, val);
-                       }
-                       // now reset the buffer a few times
-                       for (int i = 0; i < 5; ++i) {
-                               iterator.reset();
-                               int count = 0;
-                               while (iterator.hasNext()) {
-                                       Record target = iterator.next();
-                                       int val = target.getField(0, 
IntValue.class).getValue();
-                                       Assert.assertEquals(lower + (count++), 
val);
-                               }
-                               Assert.assertEquals(upper - lower, count);
-                       }
-               } while (iterator.nextBlock());
-               Assert.assertEquals(NUM_VALUES, upper);
-               
-               // close the iterator
-               iterator.close();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
new file mode 100644
index 0000000..5641f29
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.resettable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class NonReusingBlockResettableIteratorTest
+{
+       private static final int MEMORY_CAPACITY = 3 * 128 * 1024;
+       
+       private static final int NUM_VALUES = 20000;
+       
+       private MemoryManager memman;
+
+       private Iterator<Record> reader;
+
+       private List<Record> objects;
+       
+       private final TypeSerializer<Record> serializer = 
RecordSerializer.get();
+
+       @Before
+       public void startup() {
+               // set up IO and memory manager
+               this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
+               
+               // create test objects
+               this.objects = new ArrayList<Record>(20000);
+               for (int i = 0; i < NUM_VALUES; ++i) {
+                       this.objects.add(new Record(new IntValue(i)));
+               }
+               
+               // create the reader
+               this.reader = objects.iterator();
+       }
+       
+       @After
+       public void shutdown() {
+               this.objects = null;
+               
+               // check that the memory manager got all segments back
+               if (!this.memman.verifyEmpty()) {
+                       Assert.fail("A memory leak has occurred: Not all memory 
was properly returned to the memory manager.");
+               }
+               
+               this.memman.shutdown();
+               this.memman = null;
+       }
+
+       @Test
+       public void testSerialBlockResettableIterator() throws Exception
+       {
+               final AbstractInvokable memOwner = new DummyInvokable();
+               // create the resettable Iterator
+               final NonReusingBlockResettableIterator<Record> iterator = new 
NonReusingBlockResettableIterator<Record>(
+                               this.memman, this.reader, this.serializer, 1, 
memOwner);
+               // open the iterator
+               iterator.open();
+               
+               // now test walking through the iterator
+               int lower = 0;
+               int upper = 0;
+               do {
+                       lower = upper;
+                       upper = lower;
+                       // find the upper bound
+                       while (iterator.hasNext()) {
+                               Record target = iterator.next();
+                               int val = target.getField(0, 
IntValue.class).getValue();
+                               Assert.assertEquals(upper++, val);
+                       }
+                       // now reset the buffer a few times
+                       for (int i = 0; i < 5; ++i) {
+                               iterator.reset();
+                               int count = 0;
+                               while (iterator.hasNext()) {
+                                       Record target = iterator.next();
+                                       int val = target.getField(0, 
IntValue.class).getValue();
+                                       Assert.assertEquals(lower + (count++), 
val);
+                               }
+                               Assert.assertEquals(upper - lower, count);
+                       }
+               } while (iterator.nextBlock());
+               Assert.assertEquals(NUM_VALUES, upper);
+               // close the iterator
+               iterator.close();
+       }
+
+       @Test
+       public void testDoubleBufferedBlockResettableIterator() throws Exception
+       {
+               final AbstractInvokable memOwner = new DummyInvokable();
+               // create the resettable Iterator
+               final NonReusingBlockResettableIterator<Record> iterator = new 
NonReusingBlockResettableIterator<Record>(
+                               this.memman, this.reader, this.serializer, 2, 
memOwner);
+               // open the iterator
+               iterator.open();
+               
+               // now test walking through the iterator
+               int lower = 0;
+               int upper = 0;
+               do {
+                       lower = upper;
+                       upper = lower;
+                       // find the upper bound
+                       while (iterator.hasNext()) {
+                               Record target = iterator.next();
+                               int val = target.getField(0, 
IntValue.class).getValue();
+                               Assert.assertEquals(upper++, val);
+                       }
+                       // now reset the buffer a few times
+                       for (int i = 0; i < 5; ++i) {
+                               iterator.reset();
+                               int count = 0;
+                               while (iterator.hasNext()) {
+                                       Record target = iterator.next();
+                                       int val = target.getField(0, 
IntValue.class).getValue();
+                                       Assert.assertEquals(lower + (count++), 
val);
+                               }
+                               Assert.assertEquals(upper - lower, count);
+                       }
+               } while (iterator.nextBlock());
+               Assert.assertEquals(NUM_VALUES, upper);
+               
+               // close the iterator
+               iterator.close();
+       }
+
+       @Test
+       public void testTwelveFoldBufferedBlockResettableIterator() throws 
Exception
+       {
+               final AbstractInvokable memOwner = new DummyInvokable();
+               // create the resettable Iterator
+               final NonReusingBlockResettableIterator<Record> iterator = new 
NonReusingBlockResettableIterator<Record>(
+                               this.memman, this.reader, this.serializer, 12, 
memOwner);
+               // open the iterator
+               iterator.open();
+               
+               // now test walking through the iterator
+               int lower = 0;
+               int upper = 0;
+               do {
+                       lower = upper;
+                       upper = lower;
+                       // find the upper bound
+                       while (iterator.hasNext()) {
+                               Record target = iterator.next();
+                               int val = target.getField(0, 
IntValue.class).getValue();
+                               Assert.assertEquals(upper++, val);
+                       }
+                       // now reset the buffer a few times
+                       for (int i = 0; i < 5; ++i) {
+                               iterator.reset();
+                               int count = 0;
+                               while (iterator.hasNext()) {
+                                       Record target = iterator.next();
+                                       int val = target.getField(0, 
IntValue.class).getValue();
+                                       Assert.assertEquals(lower + (count++), 
val);
+                               }
+                               Assert.assertEquals(upper - lower, count);
+                       }
+               } while (iterator.nextBlock());
+               Assert.assertEquals(NUM_VALUES, upper);
+               
+               // close the iterator
+               iterator.close();
+       }
+
+}

Reply via email to