[FLINK-1285] Make execution mode configurable

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7b32a05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7b32a05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7b32a05

Branch: refs/heads/master
Commit: b7b32a050e4b77012b9b70bb98b2a22d293dfbad
Parents: 3832d7b
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Dec 15 18:40:11 2014 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jan 7 19:16:10 2015 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |  14 +-
 .../flink/api/common/ExecutionConfig.java       |  41 +-
 .../java/org/apache/flink/api/common/Plan.java  |  42 +-
 .../flink/api/java/CollectionEnvironment.java   |   7 +-
 .../flink/api/java/ExecutionEnvironment.java    |   1 +
 .../java/typeutils/runtime/TupleSerializer.java |   1 +
 .../AbstractCachedBuildSideMatchDriver.java     | 141 ++--
 .../runtime/operators/AllGroupReduceDriver.java |  56 +-
 .../runtime/operators/AllReduceDriver.java      |  50 +-
 .../flink/runtime/operators/CoGroupDriver.java  |  34 +-
 .../CoGroupWithSolutionSetFirstDriver.java      | 121 ++-
 .../CoGroupWithSolutionSetSecondDriver.java     | 122 ++-
 .../runtime/operators/CollectorMapDriver.java   |  34 +-
 .../flink/runtime/operators/CrossDriver.java    | 211 +++--
 .../flink/runtime/operators/DataSinkTask.java   |  44 +-
 .../flink/runtime/operators/DataSourceTask.java |  88 ++-
 .../flink/runtime/operators/FlatMapDriver.java  |  35 +-
 .../operators/GroupReduceCombineDriver.java     |  49 +-
 .../runtime/operators/GroupReduceDriver.java    |  26 +-
 .../JoinWithSolutionSetFirstDriver.java         |  93 ++-
 .../JoinWithSolutionSetSecondDriver.java        |  90 ++-
 .../flink/runtime/operators/MapDriver.java      |   6 +
 .../runtime/operators/MapPartitionDriver.java   |  28 +-
 .../flink/runtime/operators/MatchDriver.java    |  68 +-
 .../flink/runtime/operators/NoOpDriver.java     |  38 +-
 .../runtime/operators/PactTaskContext.java      |   5 +-
 .../runtime/operators/ReduceCombineDriver.java  |  85 +-
 .../flink/runtime/operators/ReduceDriver.java   |  80 +-
 .../runtime/operators/RegularPactTask.java      |  26 +-
 .../operators/chaining/ChainedDriver.java       |  10 +-
 .../SynchronousChainedCombineDriver.java        |  52 +-
 .../hash/BuildFirstHashMatchIterator.java       | 168 ----
 .../BuildFirstReOpenableHashMatchIterator.java  |  78 --
 .../hash/BuildSecondHashMatchIterator.java      | 166 ----
 .../BuildSecondReOpenableHashMatchIterator.java |  77 --
 .../operators/hash/CompactingHashTable.java     |  83 +-
 .../operators/hash/HashMatchIteratorBase.java   |  51 ++
 .../operators/hash/InMemoryPartition.java       |   5 +
 .../NonReusingBuildFirstHashMatchIterator.java  | 152 ++++
 ...ngBuildFirstReOpenableHashMatchIterator.java |  80 ++
 .../NonReusingBuildSecondHashMatchIterator.java | 150 ++++
 ...gBuildSecondReOpenableHashMatchIterator.java |  81 ++
 .../ReusingBuildFirstHashMatchIterator.java     | 154 ++++
 ...ngBuildFirstReOpenableHashMatchIterator.java |  80 ++
 .../ReusingBuildSecondHashMatchIterator.java    | 152 ++++
 ...gBuildSecondReOpenableHashMatchIterator.java |  79 ++
 .../sort/CombiningUnilateralSortMerger.java     |   4 +-
 .../operators/sort/FixedLengthRecordSorter.java |   4 +-
 .../operators/sort/MergeMatchIterator.java      |  14 +-
 .../NonReusingSortMergeCoGroupIterator.java     | 157 ++++
 .../sort/ReusingSortMergeCoGroupIterator.java   | 158 ++++
 .../sort/SortMergeCoGroupIterator.java          | 155 ----
 .../flink/runtime/util/KeyGroupedIterator.java  | 239 ------
 .../util/KeyGroupedIteratorImmutable.java       | 222 ------
 .../util/MutableToRegularIteratorWrapper.java   | 102 ---
 .../util/NonReusingKeyGroupedIterator.java      | 222 ++++++
 ...nReusingMutableToRegularIteratorWrapper.java |  96 +++
 .../runtime/util/ReusingKeyGroupedIterator.java | 242 ++++++
 .../ReusingMutableToRegularIteratorWrapper.java | 103 +++
 .../runtime/operators/CachedMatchTaskTest.java  |   5 +-
 .../operators/CoGroupTaskExternalITCase.java    |   5 +-
 .../runtime/operators/CoGroupTaskTest.java      |   5 +-
 .../operators/CombineTaskExternalITCase.java    |   5 +-
 .../runtime/operators/CombineTaskTest.java      |   5 +-
 .../operators/CrossTaskExternalITCase.java      |   5 +-
 .../flink/runtime/operators/CrossTaskTest.java  |   5 +-
 .../flink/runtime/operators/MapTaskTest.java    |   5 +-
 .../operators/MatchTaskExternalITCase.java      |   5 +-
 .../flink/runtime/operators/MatchTaskTest.java  |   5 +-
 .../operators/ReduceTaskExternalITCase.java     |   5 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   5 +-
 .../operators/drivers/TestTaskContext.java      |   8 +
 .../operators/hash/HashMatchIteratorITCase.java | 778 -------------------
 .../hash/NonReusingHashMatchIteratorITCase.java | 778 +++++++++++++++++++
 .../NonReusingReOpenableHashTableITCase.java    | 533 +++++++++++++
 .../hash/ReOpenableHashTableITCase.java         | 534 -------------
 .../hash/ReusingHashMatchIteratorITCase.java    | 778 +++++++++++++++++++
 .../hash/ReusingReOpenableHashTableITCase.java  | 532 +++++++++++++
 .../CombiningUnilateralSortMergerITCase.java    |   4 +-
 .../sort/MassiveStringValueSortingITCase.java   |  10 +
 ...onReusingSortMergeCoGroupIteratorITCase.java | 227 ++++++
 .../ReusingSortMergeCoGroupIteratorITCase.java  | 227 ++++++
 .../sort/SortMergeCoGroupIteratorITCase.java    | 227 ------
 .../operators/testutils/DriverTestBase.java     |  73 +-
 .../operators/testutils/MockEnvironment.java    |  10 +-
 .../operators/util/HashVsSortMiniBenchmark.java |  12 +-
 .../util/KeyGroupedIteratorImmutableTest.java   | 371 ---------
 .../runtime/util/KeyGroupedIteratorTest.java    | 401 ----------
 .../util/NonReusingKeyGroupedIteratorTest.java  | 371 +++++++++
 .../util/ReusingKeyGroupedIteratorTest.java     | 400 ++++++++++
 .../flink/test/util/JavaProgramTestBase.java    |  51 +-
 .../CoGroupConnectedComponentsITCase.java       |   4 +-
 .../DependencyConnectedComponentsITCase.java    |   3 +
 .../test/javaApiOperators/ReduceITCase.java     |  37 -
 .../flink/test/operators/ObjectReuseITCase.java | 247 ++++++
 95 files changed, 7532 insertions(+), 4116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 9c2efb3..260b30c 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.compiler.plantranslate;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -82,6 +84,7 @@ import 
org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Visitor;
 
 /**
@@ -209,7 +212,16 @@ public class NepheleJobGraphGenerator implements 
Visitor<PlanNode> {
                for (Entry<String, DistributedCacheEntry> e : 
program.getOriginalPactPlan().getCachedFiles()) {
                        DistributedCache.writeFileInfoToConfig(e.getKey(), 
e.getValue(), graph.getJobConfiguration());
                }
-               
+
+               try {
+                       InstantiationUtil.writeObjectToConfig(
+                                       
program.getOriginalPactPlan().getExecutionConfig(),
+                                       graph.getJobConfiguration(),
+                                       ExecutionConfig.CONFIG_KEY);
+               } catch (IOException e) {
+                       throw new RuntimeException("Config object could not be 
written to Job Configuration: " + e);
+               }
+
                // release all references again
                this.vertices = null;
                this.chainedTasks = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 03d5e3a..17e683f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -37,7 +37,7 @@ public class ExecutionConfig implements Serializable {
 
        // For future use...
 //     private boolean forceGenericSerializer = false;
-//     private boolean objectReuse = false;
+       private boolean objectReuse = false;
 
        /**
         * Enables the ClosureCleaner. This analyzes user code functions and 
sets fields to null
@@ -143,17 +143,30 @@ public class ExecutionConfig implements Serializable {
 //             return forceGenericSerializer;
 //     }
 //
-//     public ExecutionConfig enableObjectReuse() {
-//             objectReuse = true;
-//             return this;
-//     }
-//
-//     public ExecutionConfig disableObjectReuse() {
-//             objectReuse = false;
-//             return this;
-//     }
-//
-//     public boolean isObjectReuseEnabled() {
-//             return objectReuse;
-//     }
+
+       /**
+        * Enables reusing objects that Flink internally uses for 
deserialization and passing
+        * data to user-code functions. Keep in mind that this can lead to bugs 
when the
+        * user-code function of an operation is not aware of this behaviour.
+        */
+       public ExecutionConfig enableObjectReuse() {
+               objectReuse = true;
+               return this;
+       }
+
+       /**
+        * Disables reusing objects that Flink internally uses for 
deserialization and passing
+        * data to user-code functions. @see #enableObjectReuse()
+        */
+       public ExecutionConfig disableObjectReuse() {
+               objectReuse = false;
+               return this;
+       }
+
+       /**
+        * Returns whether object reuse has been enabled or disabled. @see 
#enableObjectReuse()
+        */
+       public boolean isObjectReuseEnabled() {
+               return objectReuse;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index f299ef4..4a975d2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -69,14 +69,14 @@ public class Plan implements Visitable<Operator<?>> {
        protected int defaultParallelism = DEFAULT_PARALELLISM;
        
        /**
-        * The number of times failed tasks are re-executed.
+        * Hash map for files in the distributed cache: registered name to 
cache entry.
         */
-       protected int numberOfExecutionRetries;
+       protected HashMap<String, DistributedCacheEntry> cacheFile = new 
HashMap<String, DistributedCacheEntry>();
 
        /**
-        * Hash map for files in the distributed cache: registered name to 
cache entry.
+        * Config object for runtime execution parameters.
         */
-       protected HashMap<String, DistributedCacheEntry> cacheFile = new 
HashMap<String, DistributedCacheEntry>();
+       protected ExecutionConfig executionConfig = new ExecutionConfig();
 
        // 
------------------------------------------------------------------------
 
@@ -264,20 +264,6 @@ public class Plan implements Visitable<Operator<?>> {
        }
        
        /**
-        * Sets the number of times that failed tasks are re-executed. A value 
of zero
-        * effectively disables fault tolerance. A value of {@code -1} 
indicates that the system
-        * default value (as defined in the configuration) should be used.
-        * 
-        * @param numberOfExecutionRetries The number of times the system will 
try to re-execute failed tasks.
-        */
-       public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
-               if (numberOfExecutionRetries < -1) {
-                       throw new IllegalArgumentException("The number of 
execution retries must be non-negative, or -1 (use system default)");
-               }
-               this.numberOfExecutionRetries = numberOfExecutionRetries;
-       }
-       
-       /**
         * Gets the number of times the system will try to re-execute failed 
tasks. A value
         * of {@code -1} indicates that the system default value (as defined in 
the configuration)
         * should be used.
@@ -285,7 +271,7 @@ public class Plan implements Visitable<Operator<?>> {
         * @return The number of times the system will try to re-execute failed 
tasks.
         */
        public int getNumberOfExecutionRetries() {
-               return numberOfExecutionRetries;
+               return executionConfig.getNumberOfExecutionRetries();
        }
        
        /**
@@ -297,7 +283,23 @@ public class Plan implements Visitable<Operator<?>> {
        public String getPostPassClassName() {
                return "org.apache.flink.compiler.postpass.RecordModelPostPass";
        }
-       
+
+       /**
+        * Sets the runtime config object.
+        * @return
+        */
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
+       /**
+        * Gets the runtime config object.
+        * @param executionConfig
+        */
+       public void setExecutionConfig(ExecutionConfig executionConfig) {
+               this.executionConfig = executionConfig;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index 3e9ff66..2d57490 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -24,13 +24,12 @@ import 
org.apache.flink.api.common.operators.CollectionExecutor;
 
 public class CollectionEnvironment extends ExecutionEnvironment {
 
-       private boolean mutableObjectSafeMode = true;
-       
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
                Plan p = createProgramPlan(jobName);
-               
-               CollectionExecutor exec = new 
CollectionExecutor(mutableObjectSafeMode);
+
+               // We need to reverse here. Object-Reuse enabled, means safe 
mode is disabled.
+               CollectionExecutor exec = new 
CollectionExecutor(!getConfig().isObjectReuseEnabled());
                return exec.execute(p);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index c19e9aa..f7057ef 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -749,6 +749,7 @@ public abstract class ExecutionEnvironment {
                if (getDegreeOfParallelism() > 0) {
                        plan.setDefaultParallelism(getDegreeOfParallelism());
                }
+               plan.setExecutionConfig(getConfig());
 
                try {
                        registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index ae429a7..9564c01 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -53,6 +53,7 @@ public final class TupleSerializer<T extends Tuple> extends 
TupleSerializerBase<
 
        @Override
        public T createInstance(Object[] fields) {
+
                try {
                        T t = tupleClass.newInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
index 286d830..f3b2dfd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
@@ -19,12 +19,15 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import 
org.apache.flink.runtime.operators.hash.BuildFirstReOpenableHashMatchIterator;
-import 
org.apache.flink.runtime.operators.hash.BuildSecondReOpenableHashMatchIterator;
+import 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
+import 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
+import 
org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
+import 
org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -37,7 +40,10 @@ public abstract class 
AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
        private final int buildSideIndex;
        
        private final int probeSideIndex;
-       
+
+       private boolean objectReuseEnabled = false;
+
+
        protected AbstractCachedBuildSideMatchDriver(int buildSideIndex, int 
probeSideIndex) {
                this.buildSideIndex = buildSideIndex;
                this.probeSideIndex = probeSideIndex;
@@ -69,34 +75,67 @@ public abstract class 
AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
 
                double availableMemory = config.getRelativeMemoryDriver();
 
-               if (buildSideIndex == 0 && probeSideIndex == 1) {
-                       
-                       matchIterator = 
-                                       new 
BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(input1, input2, 
-                                                       serializer1, 
comparator1, 
-                                                       serializer2, 
comparator2, 
-                                                       
pairComparatorFactory.createComparator21(comparator1, comparator2), 
-                                                       
this.taskContext.getMemoryManager(),
-                                                       
this.taskContext.getIOManager(),
-                                                       
this.taskContext.getOwningNepheleTask(),
-                                                       availableMemory
-                                                       );
-                       
-               } else if (buildSideIndex == 1 && probeSideIndex == 0) {
-
-                       matchIterator = 
-                                       new 
BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(input1, input2, 
-                                                       serializer1, 
comparator1, 
-                                                       serializer2, 
comparator2, 
-                                                       
pairComparatorFactory.createComparator12(comparator1, comparator2), 
-                                                       
this.taskContext.getMemoryManager(),
-                                                       
this.taskContext.getIOManager(),
-                                                       
this.taskContext.getOwningNepheleTask(),
-                                                       availableMemory
-                                                       );
-                       
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+               if (objectReuseEnabled) {
+                       if (buildSideIndex == 0 && probeSideIndex == 1) {
+
+                               matchIterator = new 
ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+                                               input1, input2,
+                                               serializer1, comparator1,
+                                               serializer2, comparator2,
+                                               
pairComparatorFactory.createComparator21(comparator1, comparator2),
+                                               
this.taskContext.getMemoryManager(),
+                                               this.taskContext.getIOManager(),
+                                               
this.taskContext.getOwningNepheleTask(),
+                                               availableMemory);
+
+
+                       } else if (buildSideIndex == 1 && probeSideIndex == 0) {
+
+                               matchIterator = new 
ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+                                               input1, input2,
+                                               serializer1, comparator1,
+                                               serializer2, comparator2,
+                                               
pairComparatorFactory.createComparator12(comparator1, comparator2),
+                                               
this.taskContext.getMemoryManager(),
+                                               this.taskContext.getIOManager(),
+                                               
this.taskContext.getOwningNepheleTask(),
+                                               availableMemory);
+
+                       } else {
+                               throw new Exception("Error: Inconsistent setup 
for repeatable hash join driver.");
+                       }
                } else {
-                       throw new Exception("Error: Inconcistent setup for 
repeatable hash join driver.");
+                       if (buildSideIndex == 0 && probeSideIndex == 1) {
+
+                               matchIterator = new 
NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+                                               input1, input2,
+                                               serializer1, comparator1,
+                                               serializer2, comparator2,
+                                               
pairComparatorFactory.createComparator21(comparator1, comparator2),
+                                               
this.taskContext.getMemoryManager(),
+                                               this.taskContext.getIOManager(),
+                                               
this.taskContext.getOwningNepheleTask(),
+                                               availableMemory);
+
+
+                       } else if (buildSideIndex == 1 && probeSideIndex == 0) {
+
+                               matchIterator = new 
NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+                                               input1, input2,
+                                               serializer1, comparator1,
+                                               serializer2, comparator2,
+                                               
pairComparatorFactory.createComparator12(comparator1, comparator2),
+                                               
this.taskContext.getMemoryManager(),
+                                               this.taskContext.getIOManager(),
+                                               
this.taskContext.getOwningNepheleTask(),
+                                               availableMemory);
+
+                       } else {
+                               throw new Exception("Error: Inconsistent setup 
for repeatable hash join driver.");
+                       }
                }
                
                this.matchIterator.open();
@@ -113,21 +152,8 @@ public abstract class 
AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
                final FlatJoinFunction<IT1, IT2, OT> matchStub = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                
-               if (buildSideIndex == 0) {
-                       
-                       final BuildFirstReOpenableHashMatchIterator<IT1, IT2, 
OT> matchIterator = (BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
-                       
-                       while (this.running && matchIterator != null && 
matchIterator.callWithNextKey(matchStub, collector));
-                       
-               } else if (buildSideIndex == 1) {
+               while (this.running && matchIterator != null && 
matchIterator.callWithNextKey(matchStub, collector));
                        
-                       final BuildSecondReOpenableHashMatchIterator<IT1, IT2, 
OT> matchIterator = (BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
-                       
-                       while (this.running && matchIterator != null && 
matchIterator.callWithNextKey(matchStub, collector));
-                       
-               } else {
-                       throw new Exception();
-               }
        }
 
        @Override
@@ -138,14 +164,25 @@ public abstract class 
AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
                
                MutableObjectIterator<IT1> input1 = 
this.taskContext.getInput(0);
                MutableObjectIterator<IT2> input2 = 
this.taskContext.getInput(1);
-               
-               if (buildSideIndex == 0 && probeSideIndex == 1) {
-                       final BuildFirstReOpenableHashMatchIterator<IT1, IT2, 
OT> matchIterator = (BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
-                       matchIterator.reopenProbe(input2);
-               }
-               else {
-                       final BuildSecondReOpenableHashMatchIterator<IT1, IT2, 
OT> matchIterator = (BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
-                       matchIterator.reopenProbe(input1);
+
+               if (objectReuseEnabled) {
+                       if (buildSideIndex == 0 && probeSideIndex == 1) {
+                               final 
ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = 
(ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+
+                               matchIterator.reopenProbe(input2);
+                       } else {
+                               final 
ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = 
(ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
+                               matchIterator.reopenProbe(input1);
+                       }
+               } else {
+                       if (buildSideIndex == 0 && probeSideIndex == 1) {
+                               final 
NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = 
(NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
+
+                               matchIterator.reopenProbe(input2);
+                       } else {
+                               final 
NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = 
(NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) 
this.matchIterator;
+                               matchIterator.reopenProbe(input1);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index 5e68bab..854dbd5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -19,13 +19,15 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper;
+import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -51,6 +53,8 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
        
        private DriverStrategy strategy;
 
+       private boolean objectReuseEnabled = false;
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -92,6 +96,13 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
                }
                this.serializer = 
this.taskContext.<IT>getInputSerializer(0).getSerializer();
                this.input = this.taskContext.getInput(0);
+
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("AllGroupReduceDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
        }
 
        @Override
@@ -100,21 +111,36 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
                        
LOG.debug(this.taskContext.formatLogString("AllGroupReduce preprocessing done. 
Running Reducer code."));
                }
 
-               final MutableToRegularIteratorWrapper<IT> inIter = new 
MutableToRegularIteratorWrapper<IT>(this.input, this.serializer);
-
-               // single UDF call with the single group
-               if (inIter.hasNext()) {
-                       if (strategy == DriverStrategy.ALL_GROUP_REDUCE) {
-                               final GroupReduceFunction<IT, OT> reducer = 
this.taskContext.getStub();
-                               final Collector<OT> output = 
this.taskContext.getOutputCollector();
-                               reducer.reduce(inIter, output);
+               if (objectReuseEnabled) {
+                       final ReusingMutableToRegularIteratorWrapper<IT> inIter 
= new ReusingMutableToRegularIteratorWrapper<IT>(this.input, this.serializer);
+
+                       // single UDF call with the single group
+                       if (inIter.hasNext()) {
+                               if (strategy == 
DriverStrategy.ALL_GROUP_REDUCE) {
+                                       final GroupReduceFunction<IT, OT> 
reducer = this.taskContext.getStub();
+                                       final Collector<OT> output = 
this.taskContext.getOutputCollector();
+                                       reducer.reduce(inIter, output);
+                               } else {
+                                       @SuppressWarnings("unchecked") final 
FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) 
this.taskContext.getStub();
+                                       @SuppressWarnings("unchecked") final 
Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector();
+                                       combiner.combine(inIter, output);
+                               }
                        }
-                       else {
-                               @SuppressWarnings("unchecked")
-                               final FlatCombineFunction<IT> combiner = 
(FlatCombineFunction<IT>) this.taskContext.getStub();
-                               @SuppressWarnings("unchecked")
-                               final Collector<IT> output = (Collector<IT>) 
this.taskContext.getOutputCollector();
-                               combiner.combine(inIter, output);
+
+               } else {
+                       final NonReusingMutableToRegularIteratorWrapper<IT> 
inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(this.input, 
this.serializer);
+
+                       // single UDF call with the single group
+                       if (inIter.hasNext()) {
+                               if (strategy == 
DriverStrategy.ALL_GROUP_REDUCE) {
+                                       final GroupReduceFunction<IT, OT> 
reducer = this.taskContext.getStub();
+                                       final Collector<OT> output = 
this.taskContext.getOutputCollector();
+                                       reducer.reduce(inIter, output);
+                               } else {
+                                       @SuppressWarnings("unchecked") final 
FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) 
this.taskContext.getStub();
+                                       @SuppressWarnings("unchecked") final 
Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector();
+                                       combiner.combine(inIter, output);
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index e26f4eb..dff2dbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -49,6 +50,8 @@ public class AllReduceDriver<T> implements 
PactDriver<ReduceFunction<T>, T> {
        
        private boolean running;
 
+       private boolean objectReuseEnabled = false;
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -86,6 +89,13 @@ public class AllReduceDriver<T> implements 
PactDriver<ReduceFunction<T>, T> {
                TypeSerializerFactory<T> serializerFactory = 
this.taskContext.getInputSerializer(0);
                this.serializer = serializerFactory.getSerializer();
                this.input = this.taskContext.getInput(0);
+
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("AllReduceDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
        }
 
        @Override
@@ -97,19 +107,35 @@ public class AllReduceDriver<T> implements 
PactDriver<ReduceFunction<T>, T> {
                final ReduceFunction<T> stub = this.taskContext.getStub();
                final MutableObjectIterator<T> input = this.input;
                final TypeSerializer<T> serializer = this.serializer;
-               
-               T val1 = serializer.createInstance();
-               
-               if ((val1 = input.next(val1)) == null) {
-                       return;
-               }
-               
-               T val2;
-               while (running && (val2 = 
input.next(serializer.createInstance())) != null) {
-                       val1 = stub.reduce(val1, val2);
+
+
+               if (objectReuseEnabled) {
+                       T val1 = serializer.createInstance();
+
+                       if ((val1 = input.next(val1)) == null) {
+                               return;
+                       }
+
+                       T val2 = serializer.createInstance();
+                       while (running && (val2 = input.next(val2)) != null) {
+                               val1 = stub.reduce(val1, val2);
+                       }
+
+                       this.taskContext.getOutputCollector().collect(val1);
+               } else {
+                       T val1 = serializer.createInstance();
+
+                       if ((val1 = input.next(val1)) == null) {
+                               return;
+                       }
+
+                       T val2;
+                       while (running && (val2 = 
input.next(serializer.createInstance())) != null) {
+                               val1 = stub.reduce(val1, val2);
+                       }
+
+                       this.taskContext.getOutputCollector().collect(val1);
                }
-               
-               this.taskContext.getOutputCollector().collect(val1);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 55498fb..6ace918 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -19,13 +19,15 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.sort.SortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
 import org.apache.flink.runtime.operators.util.CoGroupTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -39,7 +41,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * The CoGroupTask group all pairs that share the same key from both inputs. 
Each for each key, the sets of values that
  * were pair with that key of both inputs are handed to the 
<code>coGroup()</code> method of the CoGroupFunction.
  * 
- * @see org.apache.flink.api.java.record.functions.CoGroupFunction
+ * @see org.apache.flink.api.common.functions.CoGroupFunction
  */
 public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
        
@@ -52,6 +54,8 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<CoGroupFunction<I
        
        private volatile boolean running;
 
+       private boolean objectReuseEnabled = false;
+
        // 
------------------------------------------------------------------------
 
 
@@ -105,10 +109,28 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<CoGroupFunction<I
                        throw new Exception("Missing pair comparator factory 
for CoGroup driver");
                }
 
-               // create CoGropuTaskIterator according to provided local 
strategy.
-               this.coGroupIterator = new SortMergeCoGroupIterator<IT1, 
IT2>(in1, in2,
-                               serializer1, groupComparator1,  serializer2, 
groupComparator2,
-                               
pairComparatorFactory.createComparator12(groupComparator1, groupComparator2));
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("CoGroupDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
+
+               if (objectReuseEnabled) {
+                       // create CoGropuTaskIterator according to provided 
local strategy.
+                       this.coGroupIterator = new 
ReusingSortMergeCoGroupIterator<IT1, IT2>(
+                                       in1, in2,
+                                       serializer1, groupComparator1,
+                                       serializer2, groupComparator2,
+                                       
pairComparatorFactory.createComparator12(groupComparator1, groupComparator2));
+               } else {
+                       // create CoGropuTaskIterator according to provided 
local strategy.
+                       this.coGroupIterator = new 
NonReusingSortMergeCoGroupIterator<IT1, IT2>(
+                                       in1, in2,
+                                       serializer1, groupComparator1,
+                                       serializer2, groupComparator2,
+                                       
pairComparatorFactory.createComparator12(groupComparator1, groupComparator2));
+               }
                
                // open CoGroupTaskIterator - this triggers the sorting and 
blocks until the iterator is ready
                this.coGroupIterator.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 5e0ca6d..a3c69a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators;
 
 import java.util.Collections;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -31,7 +32,8 @@ import 
org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
 import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
 
@@ -46,13 +48,18 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, 
OT> implements Resettab
        private TypeSerializer<IT2> probeSideSerializer;
        
        private TypeComparator<IT2> probeSideComparator;
-       
+
+       private TypeSerializer<IT1> solutionSetSerializer;
+
+
        private TypePairComparator<IT2, IT1> pairComparator;
        
        private IT1 solutionSideRecord;
        
        protected volatile boolean running;
 
+       private boolean objectReuseEnabled = false;
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override
@@ -96,7 +103,6 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements Resettab
        @SuppressWarnings("unchecked")
        public void initialize() {
                
-               final TypeSerializer<IT1> solutionSetSerializer;
                final TypeComparator<IT1> solutionSetComparator;
                
                // grab a handle to the hash table from the iteration broker
@@ -130,7 +136,12 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, 
OT> implements Resettab
                this.probeSideSerializer = 
taskContext.<IT2>getInputSerializer(0).getSerializer();
                this.probeSideComparator = 
probeSideComparatorFactory.createComparator();
                
-               solutionSideRecord = solutionSetSerializer.createInstance();
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+               if (objectReuseEnabled) {
+                       solutionSideRecord = 
solutionSetSerializer.createInstance();
+               }
                
                TypePairComparatorFactory<IT1, IT2> factory = 
taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader());
                pairComparator = 
factory.createComparator21(solutionSetComparator, this.probeSideComparator);
@@ -149,46 +160,84 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, 
OT> implements Resettab
                final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
taskContext.getStub();
                final Collector<OT> collector = 
taskContext.getOutputCollector();
                
-               final KeyGroupedIterator<IT2> probeSideInput = new 
KeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, 
probeSideComparator);
                final SingleElementIterator<IT1> siIter = new 
SingleElementIterator<IT1>();
                final Iterable<IT1> emptySolutionSide = Collections.emptySet();
-               
-               if (this.hashTable != null) {
-                       final CompactingHashTable<IT1> join = hashTable;
-                       final CompactingHashTable<IT1>.HashTableProber<IT2> 
prober = join.getProber(this.probeSideComparator, this.pairComparator);
-                       
-                       IT1 buildSideRecord = solutionSideRecord;
-                       
-                       while (this.running && probeSideInput.nextKey()) {
-                               IT2 current = probeSideInput.getCurrent();
-       
-                               buildSideRecord = prober.getMatchFor(current, 
buildSideRecord);
-                               if (buildSideRecord != null) {
-                                       siIter.set(buildSideRecord);
-                                       coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
+
+               if (objectReuseEnabled) {
+                       final ReusingKeyGroupedIterator<IT2> probeSideInput = 
new ReusingKeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), 
probeSideSerializer, probeSideComparator);
+                       if (this.hashTable != null) {
+                               final CompactingHashTable<IT1> join = hashTable;
+                               final 
CompactingHashTable<IT1>.HashTableProber<IT2> prober = 
join.getProber(this.probeSideComparator, this.pairComparator);
+
+
+                               IT1 buildSideRecord = solutionSideRecord;
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT2 current = 
probeSideInput.getCurrent();
+
+                                       buildSideRecord = 
prober.getMatchFor(current, buildSideRecord);
+                                       if (buildSideRecord != null) {
+                                               siIter.set(buildSideRecord);
+                                               coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
+                                       } else {
+                                               
coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+                                       }
                                }
-                               else {
-                                       coGroupStub.coGroup(emptySolutionSide, 
probeSideInput.getValues(), collector);
+                       } else {
+                               final JoinHashMap<IT1> join = this.objectMap;
+                               final JoinHashMap<IT1>.Prober<IT2> prober = 
join.createProber(this.probeSideComparator, this.pairComparator);
+                               final TypeSerializer<IT1> serializer = 
join.getBuildSerializer();
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT2 current = 
probeSideInput.getCurrent();
+
+                                       IT1 buildSideRecord = 
prober.lookupMatch(current);
+                                       if (buildSideRecord != null) {
+                                               
siIter.set(serializer.copy(buildSideRecord));
+                                               coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
+                                       } else {
+                                               
coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+                                       }
                                }
                        }
-               }
-               else {
-                       final JoinHashMap<IT1> join = this.objectMap;
-                       final JoinHashMap<IT1>.Prober<IT2> prober = 
join.createProber(this.probeSideComparator, this.pairComparator);
-                       final TypeSerializer<IT1> serializer = 
join.getBuildSerializer();
-                       
-                       while (this.running && probeSideInput.nextKey()) {
-                               IT2 current = probeSideInput.getCurrent();
-       
-                               IT1 buildSideRecord = 
prober.lookupMatch(current);
-                               if (buildSideRecord != null) {
-                                       
siIter.set(serializer.copy(buildSideRecord));
-                                       coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
+               } else {
+                       final NonReusingKeyGroupedIterator<IT2> probeSideInput 
= new NonReusingKeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), 
probeSideSerializer, probeSideComparator);
+                       if (this.hashTable != null) {
+                               final CompactingHashTable<IT1> join = hashTable;
+                               final 
CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this
+                                               .probeSideComparator, 
this.pairComparator);
+
+                               IT1 buildSideRecord;
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT2 current = 
probeSideInput.getCurrent();
+
+                                       buildSideRecord = 
prober.getMatchFor(current);
+                                       if (buildSideRecord != null) {
+                                               
siIter.set(solutionSetSerializer.copy(buildSideRecord));
+                                               coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
+                                       } else {
+                                               
coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+                                       }
                                }
-                               else {
-                                       coGroupStub.coGroup(emptySolutionSide, 
probeSideInput.getValues(), collector);
+                       } else {
+                               final JoinHashMap<IT1> join = this.objectMap;
+                               final JoinHashMap<IT1>.Prober<IT2> prober = 
join.createProber(this.probeSideComparator, this.pairComparator);
+                               final TypeSerializer<IT1> serializer = 
join.getBuildSerializer();
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT2 current = 
probeSideInput.getCurrent();
+
+                                       IT1 buildSideRecord = 
prober.lookupMatch(current);
+                                       if (buildSideRecord != null) {
+                                               
siIter.set(serializer.copy(buildSideRecord));
+                                               coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
+                                       } else {
+                                               
coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+                                       }
                                }
                        }
+
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index fb88505..17fc471 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -30,7 +31,8 @@ import 
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.util.EmptyIterator;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
 
@@ -45,13 +47,17 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, 
OT> implements Resetta
        private TypeSerializer<IT1> probeSideSerializer;
        
        private TypeComparator<IT1> probeSideComparator;
-       
+
+       private TypeSerializer<IT2> solutionSetSerializer;
+
        private TypePairComparator<IT1, IT2> pairComparator;
        
        private IT2 solutionSideRecord;
        
        protected volatile boolean running;
 
+       private boolean objectReuseEnabled = false;
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override
@@ -95,7 +101,6 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, 
OT> implements Resetta
        @SuppressWarnings("unchecked")
        public void initialize() throws Exception {
                
-               final TypeSerializer<IT2> solutionSetSerializer;
                final TypeComparator<IT2> solutionSetComparator;
                
                // grab a handle to the hash table from the iteration broker
@@ -129,8 +134,13 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, 
OT> implements Resetta
                
                this.probeSideSerializer = 
taskContext.<IT1>getInputSerializer(0).getSerializer();
                this.probeSideComparator = 
probeSideComparatorFactory.createComparator();
-               
-               solutionSideRecord = solutionSetSerializer.createInstance();
+
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+               if (objectReuseEnabled) {
+                       solutionSideRecord = 
solutionSetSerializer.createInstance();
+               };
                
                TypePairComparatorFactory<IT1, IT2> factory = 
taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader());
                pairComparator = 
factory.createComparator12(this.probeSideComparator, solutionSetComparator);
@@ -149,46 +159,84 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, 
OT> implements Resetta
                final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
taskContext.getStub();
                final Collector<OT> collector = 
taskContext.getOutputCollector();
 
-               final KeyGroupedIterator<IT1> probeSideInput = new 
KeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, 
probeSideComparator);
                final SingleElementIterator<IT2> siIter = new 
SingleElementIterator<IT2>();
                final Iterable<IT2> emptySolutionSide = 
EmptyIterator.<IT2>get();
-               
-               if (this.hashTable != null) {
-                       final CompactingHashTable<IT2> join = hashTable;
-                       final CompactingHashTable<IT2>.HashTableProber<IT1> 
prober = join.getProber(this.probeSideComparator, this.pairComparator);
-                       
-                       IT2 buildSideRecord = solutionSideRecord;
-                       
-                       while (this.running && probeSideInput.nextKey()) {
-                               IT1 current = probeSideInput.getCurrent();
-       
-                               buildSideRecord = prober.getMatchFor(current, 
buildSideRecord);
-                               if (buildSideRecord != null) {
-                                       siIter.set(buildSideRecord);
-                                       
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+
+               if (objectReuseEnabled) {
+                       final ReusingKeyGroupedIterator<IT1> probeSideInput = 
new ReusingKeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), 
probeSideSerializer, probeSideComparator);
+
+                       if (this.hashTable != null) {
+                               final CompactingHashTable<IT2> join = hashTable;
+                               final 
CompactingHashTable<IT2>.HashTableProber<IT1> prober = 
join.getProber(this.probeSideComparator, this.pairComparator);
+
+                               IT2 buildSideRecord = solutionSideRecord;
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT1 current = 
probeSideInput.getCurrent();
+
+                                       buildSideRecord = 
prober.getMatchFor(current, buildSideRecord);
+                                       if (buildSideRecord != null) {
+                                               siIter.set(buildSideRecord);
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+                                       } else {
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+                                       }
                                }
-                               else {
-                                       
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+                       } else {
+                               final JoinHashMap<IT2> join = this.objectMap;
+                               final JoinHashMap<IT2>.Prober<IT1> prober = 
join.createProber(this.probeSideComparator, this.pairComparator);
+                               final TypeSerializer<IT2> serializer = 
join.getBuildSerializer();
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT1 current = 
probeSideInput.getCurrent();
+
+                                       IT2 buildSideRecord = 
prober.lookupMatch(current);
+                                       if (buildSideRecord != null) {
+                                               
siIter.set(serializer.copy(buildSideRecord));
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+                                       } else {
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+                                       }
                                }
                        }
-               }
-               else {
-                       final JoinHashMap<IT2> join = this.objectMap;
-                       final JoinHashMap<IT2>.Prober<IT1> prober = 
join.createProber(this.probeSideComparator, this.pairComparator);
-                       final TypeSerializer<IT2> serializer = 
join.getBuildSerializer();
-                       
-                       while (this.running && probeSideInput.nextKey()) {
-                               IT1 current = probeSideInput.getCurrent();
-       
-                               IT2 buildSideRecord = 
prober.lookupMatch(current);
-                               if (buildSideRecord != null) {
-                                       
siIter.set(serializer.copy(buildSideRecord));
-                                       
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+               } else {
+                       final NonReusingKeyGroupedIterator<IT1> probeSideInput 
= new NonReusingKeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), 
probeSideSerializer, probeSideComparator);
+
+                       if (this.hashTable != null) {
+                               final CompactingHashTable<IT2> join = hashTable;
+                               final 
CompactingHashTable<IT2>.HashTableProber<IT1> prober = 
join.getProber(this.probeSideComparator, this.pairComparator);
+
+                               IT2 buildSideRecord;
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT1 current = 
probeSideInput.getCurrent();
+
+                                       buildSideRecord = 
prober.getMatchFor(current);
+                                       if (buildSideRecord != null) {
+                                               
siIter.set(solutionSetSerializer.copy(buildSideRecord));
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+                                       } else {
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+                                       }
                                }
-                               else {
-                                       
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+                       } else {
+                               final JoinHashMap<IT2> join = this.objectMap;
+                               final JoinHashMap<IT2>.Prober<IT1> prober = 
join.createProber(this.probeSideComparator, this.pairComparator);
+                               final TypeSerializer<IT2> serializer = 
join.getBuildSerializer();
+
+                               while (this.running && 
probeSideInput.nextKey()) {
+                                       IT1 current = 
probeSideInput.getCurrent();
+
+                                       IT2 buildSideRecord = 
prober.lookupMatch(current);
+                                       if (buildSideRecord != null) {
+                                               
siIter.set(serializer.copy(buildSideRecord));
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+                                       } else {
+                                               
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+                                       }
                                }
                        }
+
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
index 3858864..766a9d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
@@ -19,9 +19,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Map task which is executed by a Nephele task manager. The task has a single
@@ -38,12 +41,16 @@ import org.apache.flink.util.MutableObjectIterator;
  */
 @SuppressWarnings("deprecation")
 public class CollectorMapDriver<IT, OT> implements 
PactDriver<GenericCollectorMap<IT, OT>, OT> {
-       
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CollectorMapDriver.class);
+
+
        private PactTaskContext<GenericCollectorMap<IT, OT>, OT> taskContext;
        
        private volatile boolean running;
-       
-       
+
+       private boolean objectReuseEnabled = false;
+
        @Override
        public void setup(PactTaskContext<GenericCollectorMap<IT, OT>, OT> 
context) {
                this.taskContext = context;
@@ -69,7 +76,12 @@ public class CollectorMapDriver<IT, OT> implements 
PactDriver<GenericCollectorMa
 
        @Override
        public void prepare() {
-               // nothing, since a mapper does not need any preparation
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("CollectorMapDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
        }
 
        @Override
@@ -79,10 +91,18 @@ public class CollectorMapDriver<IT, OT> implements 
PactDriver<GenericCollectorMa
                final GenericCollectorMap<IT, OT> stub = 
this.taskContext.getStub();
                final Collector<OT> output = 
this.taskContext.getOutputCollector();
 
-               IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+               if (objectReuseEnabled) {
+                       IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+
 
-               while (this.running && ((record = input.next(record)) != null)) 
{
-                       stub.map(record, output);
+                       while (this.running && ((record = input.next(record)) 
!= null)) {
+                               stub.map(record, output);
+                       }
+               } else {
+                       IT record;
+                       while (this.running && ((record = input.next()) != 
null)) {
+                               stub.map(record, output);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 4e6745a..197c08d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -62,7 +63,9 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<CrossFunction<T1, T2,
        private boolean firstIsOuter;
        
        private volatile boolean running;
-       
+
+       private boolean objectReuseEnabled = false;
+
        // 
------------------------------------------------------------------------
 
 
@@ -140,6 +143,13 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<CrossFunction<T1, T2,
                        }
                        this.memPagesForBlockSide = numPages - 
this.memPagesForSpillingSide;
                }
+
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("CrossDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
        }
 
 
@@ -201,30 +211,47 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<CrossFunction<T1, T2,
                                this.taskContext.getOwningNepheleTask());
                this.spillIter = spillVals;
                
-               T1 val1;
-               final T1 val1Reuse = serializer1.createInstance();
-               T2 val2;
-               final T2 val2Reuse = serializer2.createInstance();
-               T2 val2Copy = serializer2.createInstance();
-               
+
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
-               
-               // for all blocks
-               do {
-                       // for all values from the spilling side
-                       while (this.running && ((val2 = 
spillVals.next(val2Reuse)) != null)) {
-                               // for all values in the block
-                               while ((val1 = blockVals.next(val1Reuse)) != 
null) {
-                                       val2Copy = serializer2.copy(val2, 
val2Copy);
-                                       
collector.collect(crosser.cross(val1,val2Copy));
-                                       //crosser.cross(val1, val2Copy, 
collector);
+
+
+               if (objectReuseEnabled) {
+                       final T1 val1Reuse = serializer1.createInstance();
+                       final T2 val2Reuse = serializer2.createInstance();
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       do {
+                               // for all values from the spilling side
+                               while (this.running && ((val2 = 
spillVals.next(val2Reuse)) != null)) {
+                                       // for all values in the block
+                                       while ((val1 = 
blockVals.next(val1Reuse)) != null) {
+                                               
collector.collect(crosser.cross(val1, val2));
+                                       }
+                                       blockVals.reset();
                                }
-                               blockVals.reset();
-                       }
-                       spillVals.reset();
+                               spillVals.reset();
+                       } while (this.running && blockVals.nextBlock());
+               } else {
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       do {
+                               // for all values from the spilling side
+                               while (this.running && ((val2 = 
spillVals.next()) != null)) {
+                                       // for all values in the block
+                                       while ((val1 = blockVals.next()) != 
null) {
+                                               
collector.collect(crosser.cross(val1, serializer2.copy(val2)));
+                                       }
+                                       blockVals.reset();
+                               }
+                               spillVals.reset();
+                       } while (this.running && blockVals.nextBlock());
+
                }
-               while (this.running && blockVals.nextBlock());
        }
        
        private void runBlockedOuterSecond() throws Exception {
@@ -249,30 +276,45 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<CrossFunction<T1, T2,
                                                
this.taskContext.getOwningNepheleTask());
                this.blockIter = blockVals;
                
-               T1 val1;
-               final T1 val1Reuse = serializer1.createInstance();
-               T1 val1Copy = serializer1.createInstance();
-               T2 val2;
-               final T2 val2Reuse = serializer2.createInstance();
-
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
-               
-               // for all blocks
-               do {
-                       // for all values from the spilling side
-                       while (this.running && ((val1 = 
spillVals.next(val1Reuse)) != null)) {
-                               // for all values in the block
-                               while (this.running && ((val2 = 
blockVals.next(val2Reuse)) != null)) {
-                                       val1Copy = serializer1.copy(val1, 
val1Copy);
-                                       
collector.collect(crosser.cross(val1Copy, val2));
-                                       //crosser.cross(val1Copy, val2, 
collector);
+
+               if (objectReuseEnabled) {
+                       final T1 val1Reuse = serializer1.createInstance();
+                       final T2 val2Reuse = serializer2.createInstance();
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       do {
+                               // for all values from the spilling side
+                               while (this.running && ((val1 = 
spillVals.next(val1Reuse)) != null)) {
+                                       // for all values in the block
+                                       while (this.running && ((val2 = 
blockVals.next(val2Reuse)) != null)) {
+                                               
collector.collect(crosser.cross(val1, val2));
+                                       }
+                                       blockVals.reset();
                                }
-                               blockVals.reset();
-                       }
-                       spillVals.reset();
+                               spillVals.reset();
+                       } while (this.running && blockVals.nextBlock());
+               } else {
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       do {
+                               // for all values from the spilling side
+                               while (this.running && ((val1 = 
spillVals.next()) != null)) {
+                                       // for all values in the block
+                                       while (this.running && ((val2 = 
blockVals.next()) != null)) {
+                                               
collector.collect(crosser.cross(serializer1.copy(val1), val2));
+                                       }
+                                       blockVals.reset();
+                               }
+                               spillVals.reset();
+                       } while (this.running && blockVals.nextBlock());
+
                }
-               while (this.running && blockVals.nextBlock());
        }
        
        private void runStreamedOuterFirst() throws Exception {
@@ -292,24 +334,36 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<CrossFunction<T1, T2,
                                this.taskContext.getOwningNepheleTask());
                this.spillIter = spillVals;
                
-               T1 val1;
-               final T1 val1Reuse = serializer1.createInstance();
-               T1 val1Copy = serializer1.createInstance();
-               T2 val2;
-               final T2 val2Reuse = serializer2.createInstance();
-
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
-               
-               // for all blocks
-               while (this.running && ((val1 = in1.next(val1Reuse)) != null)) {
-                       // for all values from the spilling side
-                       while (this.running && ((val2 = 
spillVals.next(val2Reuse)) != null)) {
-                               val1Copy = serializer1.copy(val1, val1Copy);
-                               collector.collect(crosser.cross(val1Copy, 
val2));
-                               //crosser.cross(val1Copy, val2, collector);
+
+               if (objectReuseEnabled) {
+                       final T1 val1Reuse = serializer1.createInstance();
+                       final T2 val2Reuse = serializer2.createInstance();
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       while (this.running && ((val1 = in1.next(val1Reuse)) != 
null)) {
+                               // for all values from the spilling side
+                               while (this.running && ((val2 = 
spillVals.next(val2Reuse)) != null)) {
+                                       collector.collect(crosser.cross(val1, 
val2));
+                               }
+                               spillVals.reset();
                        }
-                       spillVals.reset();
+               } else {
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       while (this.running && ((val1 = in1.next()) != null)) {
+                               // for all values from the spilling side
+                               while (this.running && ((val2 = 
spillVals.next()) != null)) {
+                                       
collector.collect(crosser.cross(serializer1.copy(val1), val2));
+                               }
+                               spillVals.reset();
+                       }
+
                }
        }
        
@@ -328,25 +382,38 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<CrossFunction<T1, T2,
                                in1, serializer1, this.memManager, 
this.taskContext.getIOManager(), this.memPagesForSpillingSide,
                                this.taskContext.getOwningNepheleTask());
                this.spillIter = spillVals;
-               
-               T1 val1;
-               final T1 val1Reuse = serializer1.createInstance();
-               T2 val2;
-               final T2 val2Reuse = serializer2.createInstance();
-               T2 val2Copy = serializer2.createInstance();
-               
+
                final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
-               
-               // for all blocks
-               while (this.running && (val2 = in2.next(val2Reuse)) != null) {
-                       // for all values from the spilling side
-                       while (this.running && (val1 = 
spillVals.next(val1Reuse)) != null) {
-                               val2Copy = serializer2.copy(val2, val2Copy);
-                               collector.collect(crosser.cross(val1, 
val2Copy));
-                               //crosser.cross(val1, val2Copy, collector);
+
+               if (objectReuseEnabled) {
+                       final T1 val1Reuse = serializer1.createInstance();
+                       final T2 val2Reuse = serializer2.createInstance();
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       while (this.running && (val2 = in2.next(val2Reuse)) != 
null) {
+                               // for all values from the spilling side
+                               while (this.running && (val1 = 
spillVals.next(val1Reuse)) != null) {
+                                       collector.collect(crosser.cross(val1, 
val2));
+                                       //crosser.cross(val1, val2Copy, 
collector);
+                               }
+                               spillVals.reset();
+                       }
+               } else {
+                       T1 val1;
+                       T2 val2;
+
+                       // for all blocks
+                       while (this.running && (val2 = in2.next()) != null) {
+                               // for all values from the spilling side
+                               while (this.running && (val1 = 
spillVals.next()) != null) {
+                                       collector.collect(crosser.cross(val1, 
serializer2.copy(val2)));
+                               }
+                               spillVals.reset();
                        }
-                       spillVals.reset();
+
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 610ab06..e3ffdba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -19,7 +19,9 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
@@ -41,6 +43,8 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
+
 /**
  * DataSinkTask which is executed by a task manager. The task hands the data 
to an output format.
  * 
@@ -75,7 +79,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
        private volatile boolean taskCanceled;
        
        private volatile boolean cleanupCalled;
-       
 
        @Override
        public void registerInputOutput() {
@@ -106,6 +109,22 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                if (LOG.isDebugEnabled()) {
                        LOG.debug(getLogString("Starting data sink operator"));
                }
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               try {
+                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
+                                       getJobConfiguration(),
+                                       ExecutionConfig.CONFIG_KEY,
+                                       this.getClass().getClassLoader());
+                       if (c != null) {
+                               executionConfig = c;
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
+               }
+               boolean objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
                
                try {
                        
@@ -150,10 +169,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                        final TypeSerializer<IT> serializer = 
this.inputTypeSerializerFactory.getSerializer();
                        final MutableObjectIterator<IT> input = this.input;
                        final OutputFormat<IT> format = this.format;
-                       
-                       
-                       IT record = serializer.createInstance();
-                       
+
+
                        // check if task has been canceled
                        if (this.taskCanceled) {
                                return;
@@ -166,9 +183,20 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                        // open
                        
format.open(this.getEnvironment().getIndexInSubtaskGroup(), 
this.getEnvironment().getCurrentNumberOfSubtasks());
 
-                       // work!
-                       while (!this.taskCanceled && ((record = 
input.next(record)) != null)) {
-                               format.writeRecord(record);
+                       if (objectReuseEnabled) {
+                               IT record = serializer.createInstance();
+
+                               // work!
+                               while (!this.taskCanceled && ((record = 
input.next(record)) != null)) {
+                                       format.writeRecord(record);
+                               }
+                       } else {
+                               IT record;
+
+                               // work!
+                               while (!this.taskCanceled && ((record = 
input.next()) != null)) {
+                                       format.writeRecord(record);
+                               }
                        }
                        
                        // close. We close here such that a regular close 
throwing an exception marks a task as failed.

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 2db652f..b50bf36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.operators;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -36,10 +39,8 @@ import 
org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
-import org.apache.flink.runtime.operators.shipping.OutputCollector;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 
@@ -74,6 +75,23 @@ public class DataSourceTask<OT> extends AbstractInvokable {
        // cancel flag
        private volatile boolean taskCanceled = false;
 
+       private ExecutionConfig getExecutionConfig() {
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               try {
+                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
+                                       getJobConfiguration(),
+                                       ExecutionConfig.CONFIG_KEY,
+                                       this.getClass().getClassLoader());
+                       if (c != null) {
+                               executionConfig = c;
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
+               }
+               return executionConfig;
+       }
 
        @Override
        public void registerInputOutput() {
@@ -102,6 +120,27 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                if (LOG.isDebugEnabled()) {
                        LOG.debug(getLogString("Starting data source 
operator"));
                }
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               try {
+                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
+                                       getJobConfiguration(),
+                                       ExecutionConfig.CONFIG_KEY,
+                                       this.getClass().getClassLoader());
+                       if (c != null) {
+                               executionConfig = c;
+                       }
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
+               }
+
+               boolean objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("DataSourceTask object reuse: " + 
(objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
                
                final TypeSerializer<OT> serializer = 
this.serializerFactory.getSerializer();
                
@@ -121,8 +160,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                                // get start and end
                                final InputSplit split = splitIterator.next();
                                
-                               OT record = serializer.createInstance();
-       
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug(getLogString("Opening input 
split " + split.toString()));
                                }
@@ -137,48 +174,31 @@ public class DataSourceTask<OT> extends AbstractInvokable 
{
                                }
                                
                                try {
-                                       // special case to make the loops 
tighter
-                                       if (this.output instanceof 
OutputCollector) {
-                                               final OutputCollector<OT> 
output = (OutputCollector<OT>) this.output;
-                                               
-                                               // as long as there is data to 
read
-                                               while (!this.taskCanceled && 
!format.reachedEnd()) {
-                                                       
-                                                       OT returned;
-                                                       if ((returned = 
format.nextRecord(record)) != null) {
-                                                               
output.collect(returned);
-                                                               record = 
returned;
-                                                       }
-                                               }
-                                       }
-                                       else if (this.output instanceof 
ChainedCollectorMapDriver) {
-                                               @SuppressWarnings("unchecked")
-                                               final 
ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) 
this.output;
-                                               
+
+                                       final Collector<OT> output = 
this.output;
+
+                                       if (objectReuseEnabled) {
+                                               OT reuse = 
serializer.createInstance();
+
                                                // as long as there is data to 
read
                                                while (!this.taskCanceled && 
!format.reachedEnd()) {
-                                                       
+
                                                        OT returned;
-                                                       if ((returned = 
format.nextRecord(record)) != null) {
+                                                       if ((returned = 
format.nextRecord(reuse)) != null) {
                                                                
output.collect(returned);
-                                                               record = 
returned;
                                                        }
                                                }
-                                       }
-                                       else {
-                                               final Collector<OT> output = 
this.output;
-                                               
+                                       } else {
                                                // as long as there is data to 
read
                                                while (!this.taskCanceled && 
!format.reachedEnd()) {
-                                                       
+
                                                        OT returned;
-                                                       if ((returned = 
format.nextRecord(record)) != null) {
+                                                       if ((returned = 
format.nextRecord(serializer.createInstance())) != null) {
                                                                
output.collect(returned);
-                                                               record = 
returned;
                                                        }
                                                }
                                        }
-                                       
+
                                        if (LOG.isDebugEnabled() && 
!this.taskCanceled) {
                                                LOG.debug(getLogString("Closing 
input split " + split.toString()));
                                        }
@@ -285,7 +305,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
        private void initOutputs(ClassLoader cl) throws Exception {
                this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
                this.eventualOutputs = new ArrayList<BufferWriter>();
-               this.output = RegularPactTask.initOutputs(this, cl, 
this.config, this.chainedTasks, this.eventualOutputs);
+               this.output = RegularPactTask.initOutputs(this, cl, 
this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index f4ae62d..d63a3e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -19,9 +19,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Map task which is executed by a Nephele task manager. The task has a single
@@ -37,12 +40,15 @@ import org.apache.flink.util.MutableObjectIterator;
  * @param <OT> The mapper's output data type.
  */
 public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, 
OT>, OT> {
-       
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlatMapDriver.class);
+
        private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext;
        
        private volatile boolean running;
-       
-       
+
+       private boolean objectReuseEnabled = false;
+
        @Override
        public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) 
{
                this.taskContext = context;
@@ -68,8 +74,12 @@ public class FlatMapDriver<IT, OT> implements 
PactDriver<FlatMapFunction<IT, OT>
 
        @Override
        public void prepare() {
-               // nothing, since a mapper does not need any preparation
-       }
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("FlatMapDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }       }
 
        @Override
        public void run() throws Exception {
@@ -78,10 +88,19 @@ public class FlatMapDriver<IT, OT> implements 
PactDriver<FlatMapFunction<IT, OT>
                final FlatMapFunction<IT, OT> function = 
this.taskContext.getStub();
                final Collector<OT> output = 
this.taskContext.getOutputCollector();
 
-               IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+               if (objectReuseEnabled) {
+                       IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+
+
+                       while (this.running && ((record = input.next(record)) 
!= null)) {
+                               function.flatMap(record, output);
+                       }
+               } else {
+                       IT record;
 
-               while (this.running && ((record = input.next(record)) != null)) 
{
-                       function.flatMap(record, output);
+                       while (this.running && ((record = input.next()) != 
null)) {
+                               function.flatMap(record, output);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 4323eae..8d8d5dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
@@ -31,7 +33,7 @@ import 
org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -72,6 +74,8 @@ public class GroupReduceCombineDriver<T> implements 
PactDriver<FlatCombineFuncti
 
        private volatile boolean running = true;
 
+       private boolean objectReuseEnabled = false;
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -125,6 +129,13 @@ public class GroupReduceCombineDriver<T> implements 
PactDriver<FlatCombineFuncti
                } else {
                        this.sorter = new 
NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), 
memory);
                }
+
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("GroupReduceCombineDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
        }
 
        @Override
@@ -162,19 +173,37 @@ public class GroupReduceCombineDriver<T> implements 
PactDriver<FlatCombineFuncti
        private void sortAndCombine() throws Exception {
                final InMemorySorter<T> sorter = this.sorter;
 
-               if (!sorter.isEmpty()) {
-                       this.sortAlgo.sort(sorter);
+               if (objectReuseEnabled) {
+                       if (!sorter.isEmpty()) {
+                               this.sortAlgo.sort(sorter);
+
+                               final ReusingKeyGroupedIterator<T> keyIter = 
new ReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, 
this.groupingComparator);
 
-                       final KeyGroupedIterator<T> keyIter = new 
KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
-                                       this.groupingComparator);
 
-                       final FlatCombineFunction<T> combiner = this.combiner;
-                       final Collector<T> output = this.output;
+                               final FlatCombineFunction<T> combiner = 
this.combiner;
+                               final Collector<T> output = this.output;
 
-                       // iterate over key groups
-                       while (this.running && keyIter.nextKey()) {
-                               combiner.combine(keyIter.getValues(), output);
+                               // iterate over key groups
+                               while (this.running && keyIter.nextKey()) {
+                                       combiner.combine(keyIter.getValues(), 
output);
+                               }
                        }
+               } else {
+                       if (!sorter.isEmpty()) {
+                               this.sortAlgo.sort(sorter);
+
+                               final NonReusingKeyGroupedIterator<T> keyIter = 
new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, 
this.groupingComparator);
+
+
+                               final FlatCombineFunction<T> combiner = 
this.combiner;
+                               final Collector<T> output = this.output;
+
+                               // iterate over key groups
+                               while (this.running && keyIter.nextKey()) {
+                                       combiner.combine(keyIter.getValues(), 
output);
+                               }
+                       }
+
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 2ec9873..9d9f994 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -19,14 +19,15 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
-import org.apache.flink.runtime.util.KeyGroupedIteratorImmutable;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -51,11 +52,11 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction
        private TypeSerializer<IT> serializer;
 
        private TypeComparator<IT> comparator;
-       
-       private boolean mutableObjectMode = false;
-       
+
        private volatile boolean running;
 
+       private boolean objectReuseEnabled = false;
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -92,11 +93,12 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction
                this.serializer = 
this.taskContext.<IT>getInputSerializer(0).getSerializer();
                this.comparator = this.taskContext.getDriverComparator(0);
                this.input = this.taskContext.getInput(0);
-               
-               this.mutableObjectMode = config.getMutableObjectMode();
-               
+
+               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
+               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
+
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("GroupReduceDriver uses " + 
(this.mutableObjectMode ? "MUTABLE" : "IMMUTABLE") + " object mode.");
+                       LOG.debug("GroupReduceDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
                }
        }
 
@@ -110,15 +112,15 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction
                final GroupReduceFunction<IT, OT> stub = 
this.taskContext.getStub();
                final Collector<OT> output = 
this.taskContext.getOutputCollector();
                
-               if (mutableObjectMode) {
-                       final KeyGroupedIterator<IT> iter = new 
KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
+               if (objectReuseEnabled) {
+                       final ReusingKeyGroupedIterator<IT> iter = new 
ReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
                        // run stub implementation
                        while (this.running && iter.nextKey()) {
                                stub.reduce(iter.getValues(), output);
                        }
                }
                else {
-                       final KeyGroupedIteratorImmutable<IT> iter = new 
KeyGroupedIteratorImmutable<IT>(this.input, this.serializer, this.comparator);
+                       final NonReusingKeyGroupedIterator<IT> iter = new 
NonReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
                        // run stub implementation
                        while (this.running && iter.nextKey()) {
                                stub.reduce(iter.getValues(), output);

Reply via email to