http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
index ee23736..31707b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -52,7 +53,7 @@ public class AllGroupReduceDriverTest {
                        MutableObjectIterator<Tuple2<String, Integer>> input = 
EmptyMutableObjectIterator.get();
                        
context.setDriverStrategy(DriverStrategy.ALL_GROUP_REDUCE);
                        
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setCollector(new 
DiscardingOutputCollector<Tuple2<String, Integer>>());
                        
                        AllGroupReduceDriver<Tuple2<String, Integer>, 
Tuple2<String, Integer>> driver = new AllGroupReduceDriver<Tuple2<String, 
Integer>, Tuple2<String, Integer>>();
@@ -75,12 +76,12 @@ public class AllGroupReduceDriverTest {
                        
                        List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                        TypeInformation<Tuple2<String, Integer>> typeInfo = 
TypeExtractor.getForObject(data.get(0));
-                       MutableObjectIterator<Tuple2<String, Integer>> input = 
new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), 
typeInfo.createSerializer());
+                       MutableObjectIterator<Tuple2<String, Integer>> input = 
new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), 
typeInfo.createSerializer(new ExecutionConfig()));
                        
-                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                        
                        
context.setDriverStrategy(DriverStrategy.ALL_GROUP_REDUCE);
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setCollector(result);
                        context.setUdf(new ConcatSumReducer());
                        
@@ -115,12 +116,12 @@ public class AllGroupReduceDriverTest {
                        
                        List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                        TypeInformation<Tuple2<StringValue, IntValue>> typeInfo 
= TypeExtractor.getForObject(data.get(0));
-                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
+                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
                        
-                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                        
                        
context.setDriverStrategy(DriverStrategy.ALL_GROUP_REDUCE);
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setCollector(result);
                        context.setUdf(new ConcatSumMutableReducer());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
index bfc9168..119ac5b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -52,7 +53,7 @@ public class AllReduceDriverTest {
                        MutableObjectIterator<Tuple2<String, Integer>> input = 
EmptyMutableObjectIterator.get();
                        context.setDriverStrategy(DriverStrategy.ALL_REDUCE);
                        
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setCollector(new 
DiscardingOutputCollector<Tuple2<String, Integer>>());
                        
                        AllReduceDriver<Tuple2<String, Integer>> driver = new 
AllReduceDriver<Tuple2<String,Integer>>();
@@ -76,12 +77,12 @@ public class AllReduceDriverTest {
                                
                                List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                                TypeInformation<Tuple2<String, Integer>> 
typeInfo = TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer());
+                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
                                
-                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.ALL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setCollector(result);
                                context.setUdf(new ConcatSumFirstReducer());
                                
@@ -108,12 +109,12 @@ public class AllReduceDriverTest {
                                
                                List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                                TypeInformation<Tuple2<String, Integer>> 
typeInfo = TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer());
+                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
                                
-                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.ALL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setCollector(result);
                                context.setUdf(new ConcatSumSecondReducer());
                                
@@ -150,12 +151,12 @@ public class AllReduceDriverTest {
                                
                                List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                                TypeInformation<Tuple2<StringValue, IntValue>> 
typeInfo = TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
+                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
                                
-                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.ALL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setCollector(result);
                                context.setUdf(new 
ConcatSumFirstMutableReducer());
                                
@@ -181,12 +182,12 @@ public class AllReduceDriverTest {
                                
                                List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                                TypeInformation<Tuple2<StringValue, IntValue>> 
typeInfo = TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
+                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
                                
-                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.ALL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setCollector(result);
                                context.setUdf(new 
ConcatSumSecondMutableReducer());
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index d249e9a..53ca6ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -50,12 +51,12 @@ public class GroupReduceDriverTest {
                        List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                        TupleTypeInfo<Tuple2<String, Integer>> typeInfo = 
(TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
                        MutableObjectIterator<Tuple2<String, Integer>> input = 
EmptyMutableObjectIterator.get();
-                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new 
ExecutionConfig());
                        
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
                        
-                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                        
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setComparator1(comparator);
                        context.setCollector(result);
                        
@@ -81,13 +82,13 @@ public class GroupReduceDriverTest {
                        
                        List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                        TupleTypeInfo<Tuple2<String, Integer>> typeInfo = 
(TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
-                       MutableObjectIterator<Tuple2<String, Integer>> input = 
new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), 
typeInfo.createSerializer());
-                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       MutableObjectIterator<Tuple2<String, Integer>> input = 
new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), 
typeInfo.createSerializer(new ExecutionConfig()));
+                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new 
ExecutionConfig());
                        
-                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                        
                        
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setCollector(result);
                        context.setComparator1(comparator);
                        context.setUdf(new ConcatSumReducer());
@@ -117,13 +118,13 @@ public class GroupReduceDriverTest {
                        
                        List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                        TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = 
(TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                       TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                       TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                        
-                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                        
                        
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setComparator1(comparator);
                        context.setCollector(result);
                        context.setUdf(new ConcatSumMutableReducer());
@@ -153,13 +154,13 @@ public class GroupReduceDriverTest {
                        
                        List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                        TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = 
(TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                       TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                       TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                        
-                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                        
                        
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setComparator1(comparator);
                        context.setCollector(result);
                        context.setUdf(new 
ConcatSumMutableAccumulatingReducer());
@@ -195,13 +196,13 @@ public class GroupReduceDriverTest {
                        
                        List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                        TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = 
(TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                       TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       MutableObjectIterator<Tuple2<StringValue, IntValue>> 
input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                       TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                        
-                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<StringValue, IntValue>> 
result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                        
                        
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setComparator1(comparator);
                        context.setCollector(result);
                        context.setUdf(new 
ConcatSumMutableAccumulatingReducer());

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
index 44cbe16..7494847 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.functions.RichReduceFunction;
@@ -55,11 +56,11 @@ public class ReduceCombineDriverTest {
                        MutableObjectIterator<Tuple2<String, Integer>> input = 
EmptyMutableObjectIterator.get();
                        
                        
context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE);
-                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new 
ExecutionConfig());
                        
-                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                        
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setComparator1(comparator);
                        context.setCollector(result);
                        
@@ -89,13 +90,13 @@ public class ReduceCombineDriverTest {
                                Collections.shuffle(data);
                                
                                TupleTypeInfo<Tuple2<String, Integer>> typeInfo 
= (TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new ConcatSumFirstReducer());
@@ -120,13 +121,13 @@ public class ReduceCombineDriverTest {
                                Collections.shuffle(data);
                                
                                TupleTypeInfo<Tuple2<String, Integer>> typeInfo 
= (TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new ConcatSumSecondReducer());
@@ -159,13 +160,13 @@ public class ReduceCombineDriverTest {
                                
                                List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                                TupleTypeInfo<Tuple2<StringValue, IntValue>> 
typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new 
ConcatSumFirstMutableReducer());
@@ -187,13 +188,13 @@ public class ReduceCombineDriverTest {
                                
                                List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                                TupleTypeInfo<Tuple2<StringValue, IntValue>> 
typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new 
ConcatSumSecondMutableReducer());

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
index ae4e54c..5d78835 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.drivers;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.functions.RichReduceFunction;
@@ -50,11 +51,11 @@ public class ReduceDriverTest {
                        TupleTypeInfo<Tuple2<String, Integer>> typeInfo = 
(TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
                        MutableObjectIterator<Tuple2<String, Integer>> input = 
EmptyMutableObjectIterator.get();
                        context.setDriverStrategy(DriverStrategy.SORTED_REDUCE);
-                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                       TypeComparator<Tuple2<String, Integer>> comparator = 
typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new 
ExecutionConfig());
                        
-                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                       GatheringCollector<Tuple2<String, Integer>> result = 
new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                        
-                       context.setInput1(input, typeInfo.createSerializer());
+                       context.setInput1(input, typeInfo.createSerializer(new 
ExecutionConfig()));
                        context.setComparator1(comparator);
                        context.setCollector(result);
                        
@@ -81,13 +82,13 @@ public class ReduceDriverTest {
                                
                                List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                                TupleTypeInfo<Tuple2<String, Integer>> typeInfo 
= (TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new ConcatSumFirstReducer());
@@ -109,13 +110,13 @@ public class ReduceDriverTest {
                                
                                List<Tuple2<String, Integer>> data = 
DriverTestData.createReduceImmutableData();
                                TupleTypeInfo<Tuple2<String, Integer>> typeInfo 
= (TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<String, Integer>> 
input = new RegularToMutableObjectIterator<Tuple2<String, 
Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<String, Integer>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<String, Integer>> 
result = new 
GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new 
ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new ConcatSumSecondReducer());
@@ -147,13 +148,13 @@ public class ReduceDriverTest {
                                
                                List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                                TupleTypeInfo<Tuple2<StringValue, IntValue>> 
typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new 
ConcatSumFirstMutableReducer());
@@ -174,13 +175,13 @@ public class ReduceDriverTest {
                                
                                List<Tuple2<StringValue, IntValue>> data = 
DriverTestData.createReduceMutableData();
                                TupleTypeInfo<Tuple2<StringValue, IntValue>> 
typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) 
TypeExtractor.getForObject(data.get(0));
-                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer());
-                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0);
+                               MutableObjectIterator<Tuple2<StringValue, 
IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, 
IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig()));
+                               TypeComparator<Tuple2<StringValue, IntValue>> 
comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                                
-                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer());
+                               GatheringCollector<Tuple2<StringValue, 
IntValue>> result = new GatheringCollector<Tuple2<StringValue, 
IntValue>>(typeInfo.createSerializer(new ExecutionConfig()));
                                
                                
context.setDriverStrategy(DriverStrategy.SORTED_REDUCE);
-                               context.setInput1(input, 
typeInfo.createSerializer());
+                               context.setInput1(input, 
typeInfo.createSerializer(new ExecutionConfig()));
                                context.setComparator1(comparator);
                                context.setCollector(result);
                                context.setUdf(new 
ConcatSumSecondMutableReducer());

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index af7b008..c0083a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -98,8 +99,8 @@ public class ExternalSortLargeRecordsITCase {
                        
                        final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
                                                                new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
-                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
-                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer(new ExecutionConfig());
+                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0, 
new ExecutionConfig());
 
                        MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source =
                                        new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>() {
@@ -168,8 +169,8 @@ public class ExternalSortLargeRecordsITCase {
                        
                        final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
                                                                new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
-                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
-                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer(new ExecutionConfig());
+                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0, 
new ExecutionConfig());
 
                        MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source =
                                        new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>() {
@@ -240,8 +241,8 @@ public class ExternalSortLargeRecordsITCase {
                        final TupleTypeInfo<Tuple2<Long, 
SmallOrMediumOrLargeValue>> typeInfo = 
                                                                new 
TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types);
                        
-                       final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
-                       final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0);
+                       final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+                       final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0, new ExecutionConfig());
 
                        MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source =
                                        new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>() {
@@ -323,8 +324,8 @@ public class ExternalSortLargeRecordsITCase {
                        final TupleTypeInfo<Tuple2<Long, 
SmallOrMediumOrLargeValue>> typeInfo = 
                                                                new 
TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types);
                        
-                       final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
-                       final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0);
+                       final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+                       final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0, new ExecutionConfig());
 
                        MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source =
                                        new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 3d237f7..498cb61 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -76,9 +77,9 @@ public class LargeRecordHandlerITCase {
                        final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, 
Byte>> typeInfo = 
                                                                new 
TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types);
 
-                       final TypeSerializer<Tuple3<Long, SomeVeryLongValue, 
Byte>> serializer = typeInfo.createSerializer();
+                       final TypeSerializer<Tuple3<Long, SomeVeryLongValue, 
Byte>> serializer = typeInfo.createSerializer(new ExecutionConfig());
                        final TypeComparator<Tuple3<Long, SomeVeryLongValue, 
Byte>> comparator = typeInfo.createComparator(
-                                       new int[] {2, 0}, new boolean[] {true, 
true}, 0);
+                                       new int[] {2, 0}, new boolean[] {true, 
true}, 0, new ExecutionConfig());
                        
                        LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, 
Byte>> handler = new LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, Byte>>(
                                        serializer, comparator, ioMan, memMan, 
initialMemory, owner, 128);
@@ -216,7 +217,7 @@ public class LargeRecordHandlerITCase {
                        final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, 
Byte>> typeInfo = 
                                                                new 
TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types);
 
-                       final TypeSerializer<Tuple3<Long, SomeVeryLongValue, 
Byte>> serializer = typeInfo.createSerializer();
+                       final TypeSerializer<Tuple3<Long, SomeVeryLongValue, 
Byte>> serializer = typeInfo.createSerializer(new ExecutionConfig());
 
                        
                        channel = ioMan.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index d2abd62..6eb736f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -55,9 +56,9 @@ public class LargeRecordHandlerTest {
                        final TupleTypeInfo<Tuple2<Long, String>> typeInfo = 
(TupleTypeInfo<Tuple2<Long, String>>) 
                                        TypeInfoParser.<Tuple2<Long, 
String>>parse("Tuple2<Long, String>");
 
-                       final TypeSerializer<Tuple2<Long, String>> serializer = 
typeInfo.createSerializer();
+                       final TypeSerializer<Tuple2<Long, String>> serializer = 
typeInfo.createSerializer(new ExecutionConfig());
                        final TypeComparator<Tuple2<Long, String>> comparator = 
typeInfo.createComparator(
-                                       new int[] {0}, new boolean[] {true}, 0);
+                                       new int[] {0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                        
                        LargeRecordHandler<Tuple2<Long, String>> handler = new 
LargeRecordHandler<Tuple2<Long, String>>(
                                        serializer, comparator, ioMan, memMan, 
memory, owner, 128);
@@ -109,9 +110,9 @@ public class LargeRecordHandlerTest {
                        final TupleTypeInfo<Tuple2<Long, String>> typeInfo = 
(TupleTypeInfo<Tuple2<Long, String>>) 
                                        TypeInfoParser.<Tuple2<Long, 
String>>parse("Tuple2<Long, String>");
 
-                       final TypeSerializer<Tuple2<Long, String>> serializer = 
typeInfo.createSerializer();
+                       final TypeSerializer<Tuple2<Long, String>> serializer = 
typeInfo.createSerializer(new ExecutionConfig());
                        final TypeComparator<Tuple2<Long, String>> comparator = 
typeInfo.createComparator(
-                                       new int[] {0}, new boolean[] {true}, 0);
+                                       new int[] {0}, new boolean[] {true}, 0, 
new ExecutionConfig());
                        
                        LargeRecordHandler<Tuple2<Long, String>> handler = new 
LargeRecordHandler<Tuple2<Long, String>>(
                                        serializer, comparator, ioMan, memMan, 
initialMemory, owner, 128);
@@ -197,9 +198,9 @@ public class LargeRecordHandlerTest {
                        final TupleTypeInfo<Tuple3<Long, String, Byte>> 
typeInfo = (TupleTypeInfo<Tuple3<Long, String, Byte>>) 
                                        TypeInfoParser.<Tuple3<Long, String, 
Byte>>parse("Tuple3<Long, String, Byte>");
 
-                       final TypeSerializer<Tuple3<Long, String, Byte>> 
serializer = typeInfo.createSerializer();
+                       final TypeSerializer<Tuple3<Long, String, Byte>> 
serializer = typeInfo.createSerializer(new ExecutionConfig());
                        final TypeComparator<Tuple3<Long, String, Byte>> 
comparator = typeInfo.createComparator(
-                                       new int[] {2, 0}, new boolean[] {true, 
true}, 0);
+                                       new int[] {2, 0}, new boolean[] {true, 
true}, 0, new ExecutionConfig());
                        
                        LargeRecordHandler<Tuple3<Long, String, Byte>> handler 
= new LargeRecordHandler<Tuple3<Long, String, Byte>>(
                                        serializer, comparator, ioMan, memMan, 
initialMemory, owner, 128);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 084da41..b96fdf2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
@@ -175,8 +176,8 @@ public class MassiveStringSortingITCase {
                                        
                                TupleTypeInfo<Tuple2<String, String[]>> 
typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) (TupleTypeInfo<?>) 
TypeInfoParser.parse("Tuple2<String, String[]>");
 
-                               TypeSerializer<Tuple2<String, String[]>> 
serializer = typeInfo.createSerializer();
-                               TypeComparator<Tuple2<String, String[]>> 
comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 
0);
+                               TypeSerializer<Tuple2<String, String[]>> 
serializer = typeInfo.createSerializer(new ExecutionConfig());
+                               TypeComparator<Tuple2<String, String[]>> 
comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 
0, new ExecutionConfig());
                                
                                reader = new BufferedReader(new 
FileReader(input));
                                MutableObjectIterator<Tuple2<String, String[]>> 
inputIterator = new StringTupleReaderMutableObjectIterator(reader);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
index 9e925e8..5265cdb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -177,8 +178,8 @@ public class MassiveStringValueSortingITCase {
                                TupleTypeInfo<Tuple2<StringValue, 
StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>) 
(TupleTypeInfo<?>)
                                                
TypeInfoParser.parse("Tuple2<org.apache.flink.types.StringValue, 
org.apache.flink.types.StringValue[]>");
 
-                               TypeSerializer<Tuple2<StringValue, 
StringValue[]>> serializer = typeInfo.createSerializer();
-                               TypeComparator<Tuple2<StringValue, 
StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new 
boolean[] { true }, 0);
+                               TypeSerializer<Tuple2<StringValue, 
StringValue[]>> serializer = typeInfo.createSerializer(new ExecutionConfig());
+                               TypeComparator<Tuple2<StringValue, 
StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new 
boolean[] { true }, 0, new ExecutionConfig());
                                
                                reader = new BufferedReader(new 
FileReader(input));
                                MutableObjectIterator<Tuple2<StringValue, 
StringValue[]>> inputIterator = new 
StringValueTupleReaderMutableObjectIterator(reader);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index d352817..7f468fa 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.aggregation.AggregationFunction;
 import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -162,7 +162,7 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
                genName.setLength(genName.length()-1);
 
                @SuppressWarnings("rawtypes")
-               RichGroupReduceFunction<IN, IN> function = new 
AggregatingUdf(getInputType().createSerializer(), aggFunctions, fields);
+               RichGroupReduceFunction<IN, IN> function = new 
AggregatingUdf(getInputType(), aggFunctions, fields);
 
 
                String name = getName() != null ? getName() : 
genName.toString();
@@ -240,12 +240,14 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
 
                private TupleSerializerBase<T> serializer;
 
-               public AggregatingUdf(TypeSerializer<T> serializer, 
AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) {
-                       Validate.notNull(serializer);
+               private TypeInformation<T> typeInfo;
+
+               public AggregatingUdf(TypeInformation<T> typeInfo, 
AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) {
+                       Validate.notNull(typeInfo);
                        Validate.notNull(aggFunctions);
                        Validate.isTrue(aggFunctions.length == 
fieldPositions.length);
-                       Validate.isInstanceOf(TupleSerializerBase.class, 
serializer, "Serializer for Scala Aggregate Operator must be a tuple 
serializer.");
-                       this.serializer = (TupleSerializerBase<T>) serializer;
+                       Validate.isTrue(typeInfo.isTupleType(), "TypeInfo for 
Scala Aggregate Operator must be a tuple TypeInfo.");
+                       this.typeInfo = typeInfo;
                        this.aggFunctions = aggFunctions;
                        this.fieldPositions = fieldPositions;
                }
@@ -256,6 +258,7 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
                        for (AggregationFunction<Object> aggFunction : 
aggFunctions) {
                                aggFunction.initializeAggregate();
                        }
+                       this.serializer = (TupleSerializerBase<T>) 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index 2ee1009..79c6659 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -22,6 +22,7 @@ package org.apache.flink.api.scala.operators;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
@@ -69,7 +70,9 @@ public class ScalaCsvInputFormat<OUT extends Product> extends 
GenericCsvInputFor
                        throw new UnsupportedOperationException("This only 
works on tuple types.");
                }
                TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) 
typeInfo;
-               serializer = 
(TupleSerializerBase<OUT>)tupleType.createSerializer();
+               // We can use an empty config here, since we only use the 
serializer to create
+               // the top-level case class
+               serializer = (TupleSerializerBase<OUT>) 
tupleType.createSerializer(new ExecutionConfig());
 
                Class<?>[] classes = new Class[tupleType.getArity()];
                for (int i = 0; i < tupleType.getArity(); i++) {
@@ -214,7 +217,7 @@ public class ScalaCsvInputFormat<OUT extends Product> 
extends GenericCsvInputFor
                                return null;
                        }
                }
-               
+
                if (parseRecord(parsedValues, bytes, offset, numBytes)) {
                        OUT result = serializer.createInstance(parsedValues);
                        return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
index 9eb4923..afcdc4a 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.api.scala.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -219,7 +220,7 @@ public class ScalaCsvOutputFormat<T extends Product> 
extends FileOutputFormat<T>
         * is in fact a tuple type.
         */
        @Override
-       public void setInputType(TypeInformation<?> type) {
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
                if (!type.isTupleType()) {
                        throw new InvalidProgramException("The " + 
ScalaCsvOutputFormat.class.getSimpleName() +
                                " can only be used to write tuple data sets.");

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
index 2e69efa..e57fd5b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala
 
 import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
 import org.apache.flink.api.java.operators._
@@ -114,10 +115,10 @@ private[flink] object CrossDataSet {
     val returnType = new CaseClassTypeInfo[(L, R)](
       classOf[(L, R)], Seq(leftInput.getType, rightInput.getType), Array("_1", 
"_2")) {
 
-      override def createSerializer: TypeSerializer[(L, R)] = {
+      override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[(L, R)] = {
         val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
         for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
+          fieldSerializers(i) = types(i).createSerializer(executionConfig)
         }
 
         new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index e193770..7ec65c6 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -69,12 +69,6 @@ import scala.reflect.ClassTag
  *  be created.
  */
 class ExecutionEnvironment(javaEnv: JavaEnv) {
-  /**
-   * Sets the config object.
-   */
-  def setConfig(config: ExecutionConfig): Unit = {
-    javaEnv.setConfig(config)
-  }
 
   /**
    * Gets the config object.
@@ -411,7 +405,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     CollectionInputFormat.checkCollection(data.asJavaCollection, 
typeInfo.getTypeClass)
     val dataSource = new DataSource[T](
       javaEnv,
-      new CollectionInputFormat[T](data.asJavaCollection, 
typeInfo.createSerializer),
+      new CollectionInputFormat[T](data.asJavaCollection, 
typeInfo.createSerializer(getConfig)),
       typeInfo,
       getCallLocationName())
     wrap(dataSource)

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
index 9f895fb..2b2e372 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators._
@@ -68,10 +69,11 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
     val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
       classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), 
Array("_1", "_2")) {
 
-      override def createSerializer: TypeSerializer[(Array[L], Array[R])] = {
+      override def createSerializer(
+          executionConfig: ExecutionConfig): TypeSerializer[(Array[L], 
Array[R])] = {
         val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
         for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
+          fieldSerializers(i) = types(i).createSerializer(executionConfig)
         }
 
         new CaseClassSerializer[(Array[L], Array[R])](

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index c2e432f..e025192 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -272,8 +272,12 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: 
MacroContextHolder[C]
       def unapply(tpe: Type): Option[Type] = tpe match {
         case _ if tpe <:< typeOf[BitSet] => Some(typeOf[Int])
 
-        case _ if tpe <:< typeOf[SortedMap[_, _]] => None
-        case _ if tpe <:< typeOf[SortedSet[_]] => None
+        case _ if tpe <:< typeOf[SortedMap[_, _]] =>
+          // handled by generic serializer
+          None
+        case _ if tpe <:< typeOf[SortedSet[_]] =>
+          // handled by generic serializer
+          None
 
         case _ if tpe <:< typeOf[TraversableOnce[_]] =>
 //          val traversable = tpe.baseClasses

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index f6630a0..bafb7bf 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -19,9 +19,10 @@ package org.apache.flink.api.scala.codegen
 
 import java.lang.reflect.{Field, Modifier}
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo._
 
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
 import org.apache.flink.types.Value
@@ -98,10 +99,10 @@ private[flink] trait TypeInformationGen[C <: Context] {
     val fieldNamesExpr = c.Expr[Seq[String]](mkSeq(fieldNames))
     reify {
       new CaseClassTypeInfo[T](tpeClazz.splice, fieldsExpr.splice, 
fieldNamesExpr.splice) {
-        override def createSerializer: TypeSerializer[T] = {
+        override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[T] = {
           val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
           for (i <- 0 until getArity) {
-            fieldSerializers(i) = types(i).createSerializer
+            fieldSerializers(i) = types(i).createSerializer(executionConfig)
           }
 
           new CaseClassSerializer[T](tupleType, fieldSerializers) {
@@ -170,12 +171,13 @@ private[flink] trait TypeInformationGen[C <: Context] {
       import scala.collection.generic.CanBuildFrom
       import org.apache.flink.api.scala.typeutils.TraversableTypeInfo
       import org.apache.flink.api.scala.typeutils.TraversableSerializer
+      import org.apache.flink.api.common.ExecutionConfig
 
       val elementTpe = $elementTypeInfo
       new TraversableTypeInfo($collectionClass, elementTpe) {
-        def createSerializer() = {
+        def createSerializer(executionConfig: ExecutionConfig) = {
           new TraversableSerializer[${desc.tpe}, ${desc.elem.tpe}](
-              elementTpe.createSerializer) {
+              elementTpe.createSerializer(executionConfig)) {
             def getCbf = implicitly[CanBuildFrom[${desc.tpe}, 
${desc.elem.tpe}, ${desc.tpe}]]
           }
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index bff26cb..420496f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.scala
 
 import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.{ExecutionConfig, InvalidProgramException}
 import org.apache.flink.api.common.functions.{JoinFunction, 
RichFlatJoinFunction, FlatJoinFunction}
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
@@ -236,10 +236,10 @@ class UnfinishedJoinOperation[L, R](
     val returnType = new CaseClassTypeInfo[(L, R)](
       classOf[(L, R)], Seq(leftSet.getType, rightSet.getType), Array("_1", 
"_2")) {
 
-      override def createSerializer: TypeSerializer[(L, R)] = {
+      override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[(L, R)] = {
         val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
         for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
+          fieldSerializers(i) = types(i).createSerializer(executionConfig)
         }
 
         new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index b407332..1d627ab 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -20,14 +20,15 @@ package org.apache.flink.api.scala.typeutils
 
 import java.util.regex.{Pattern, Matcher}
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.AtomicType
 import 
org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import 
org.apache.flink.api.java.typeutils.PojoTypeInfo.NamedFlatFieldDescriptor
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
+import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, PojoTypeInfo}
+import PojoTypeInfo.NamedFlatFieldDescriptor
 
 /**
  * TypeInformation for Case Classes. Creation and access is different from
@@ -74,13 +75,13 @@ abstract class CaseClassTypeInfo[T <: Product](
     comparatorHelperIndex += 1
   }
 
-  override protected def getNewComparator: TypeComparator[T] = {
+  override protected def getNewComparator(executionConfig: ExecutionConfig): 
TypeComparator[T] = {
     val finalLogicalKeyFields = logicalKeyFields.take(comparatorHelperIndex)
     val finalComparators = fieldComparators.take(comparatorHelperIndex)
     val maxKey = finalLogicalKeyFields.max
 
     // create serializers only up to the last key, fields after that are not 
needed
-    val fieldSerializers = types.take(maxKey + 1).map(_.createSerializer)
+    val fieldSerializers = types.take(maxKey + 
1).map(_.createSerializer(executionConfig))
     new CaseClassComparator[T](finalLogicalKeyFields, finalComparators, 
fieldSerializers.toArray)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index ce19a65..e2d3388 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
@@ -36,12 +37,18 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
   override def getArity: Int = 1
   override def getTypeClass = clazz
 
-  def createSerializer(): TypeSerializer[T] = {
-    val leftSerializer =
-      if (leftTypeInfo != null) leftTypeInfo.createSerializer() else new 
NothingSerializer
+  def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
+    val leftSerializer = if (leftTypeInfo != null) {
+      leftTypeInfo.createSerializer(executionConfig)
+    } else {
+      new NothingSerializer
+    }
 
-    val rightSerializer =
-      if (rightTypeInfo != null) rightTypeInfo.createSerializer() else new 
NothingSerializer
+    val rightSerializer = if (rightTypeInfo != null) {
+      rightTypeInfo.createSerializer(executionConfig)
+    } else {
+      new NothingSerializer
+    }
     new EitherSerializer(leftSerializer, rightSerializer)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 97fe7a7..4d39f7f 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
@@ -33,12 +34,13 @@ class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: 
TypeInformation[A])
   override def getArity: Int = 1
   override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
 
-  def createSerializer(): TypeSerializer[T] = {
+  def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {
       // this happens when the type of a DataSet is None, i.e. DataSet[None]
       new OptionSerializer(new 
NothingSerializer).asInstanceOf[TypeSerializer[T]]
     } else {
-      new 
OptionSerializer(elemTypeInfo.createSerializer()).asInstanceOf[TypeSerializer[T]]
+      new OptionSerializer(elemTypeInfo.createSerializer(executionConfig))
+        .asInstanceOf[TypeSerializer[T]]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 06e40d8..96dc96d 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
@@ -37,7 +38,7 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], 
E](
   override def getArity: Int = 1
   override def getTypeClass: Class[T] = clazz
 
-  def createSerializer(): TypeSerializer[T]
+  def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
 
   override def equals(other: Any): Boolean = {
     if (other.isInstanceOf[TraversableTypeInfo[_, _]]) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index 1f565f2..5db1290 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
@@ -26,12 +27,12 @@ import scala.util.{Success, Try, Failure}
 /**
  * Serializer for [[scala.util.Try]].
  */
-class TrySerializer[A](val elemSerializer: TypeSerializer[A])
+class TrySerializer[A](val elemSerializer: TypeSerializer[A], executionConfig: 
ExecutionConfig)
   extends TypeSerializer[Try[A]] {
 
   override def duplicate: TrySerializer[A] = this
 
-  val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable])
+  val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable], 
executionConfig)
 
   override def createInstance: Try[A] = {
     Failure(new RuntimeException("Empty Failure"))

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index b630cd4..d111260 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
@@ -36,12 +37,13 @@ class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: 
TypeInformation[A])
   override def getArity: Int = 1
   override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
 
-  def createSerializer(): TypeSerializer[T] = {
+  def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {
       // this happens when the type of a DataSet is None, i.e. DataSet[Failure]
-      new TrySerializer(new NothingSerializer).asInstanceOf[TypeSerializer[T]]
+      new TrySerializer(new NothingSerializer, 
executionConfig).asInstanceOf[TypeSerializer[T]]
     } else {
-      new 
TrySerializer(elemTypeInfo.createSerializer()).asInstanceOf[TypeSerializer[T]]
+      new TrySerializer(elemTypeInfo.createSerializer(executionConfig), 
executionConfig)
+        .asInstanceOf[TypeSerializer[T]]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index 376fd70..f00859f 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -103,7 +104,8 @@ public final class HadoopReduceCombineFunction<KEYIN, 
VALUEIN, KEYOUT, VALUEOUT>
                
                this.reporter = new HadoopDummyReporter();
                Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(inKeyClass);
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass((Class<KEYIN>) 
inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
                this.combineCollector = new HadoopOutputCollector<KEYIN, 
VALUEIN>();
                this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index fd2c493..6943421 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -92,7 +93,8 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, 
KEYOUT, VALUEOUT>
                this.reporter = new HadoopDummyReporter();
                this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
                Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(inKeyClass);
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
index 5ecac2e..a063183 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 /**
  * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the 
second (value) field.
@@ -42,8 +41,8 @@ public class HadoopTupleUnwrappingIterator<KEY,VALUE>
        private KEY curKey = null;
        private VALUE firstValue = null;
        
-       public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
-               this.keySerializer = TypeExtractor.getForClass((Class<KEY>) 
keyClass).createSerializer();
+       public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) 
{
+               this.keySerializer = keySerializer;
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
index 2592b88..524318c 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.NoSuchElementException;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Assert;
@@ -33,7 +34,8 @@ public class HadoopTupleUnwrappingIteratorTest {
        public void testValueIterator() {
                
                HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
-                               new HadoopTupleUnwrappingIterator<IntWritable, 
IntWritable>(IntWritable.class);
+                               new HadoopTupleUnwrappingIterator<IntWritable, 
IntWritable>(new WritableSerializer
+                                               
<IntWritable>(IntWritable.class));
                
                // many values
                

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index b5e43af..4800c96 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.compiler.plan.StreamingPlan;
@@ -87,7 +88,11 @@ public class StreamGraph extends StreamingPlan {
 
        private Set<String> sources;
 
-       public StreamGraph() {
+       private ExecutionConfig executionConfig;
+
+       public StreamGraph(ExecutionConfig executionConfig) {
+
+               this.executionConfig = executionConfig;
 
                initGraph();
 
@@ -145,9 +150,9 @@ public class StreamGraph extends StreamingPlan {
                addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName, parallelism);
 
                StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? 
new StreamRecordSerializer<IN>(
-                               inTypeInfo) : null;
+                               inTypeInfo, executionConfig) : null;
                StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null 
? new StreamRecordSerializer<OUT>(
-                               outTypeInfo) : null;
+                               outTypeInfo, executionConfig) : null;
 
                addTypeSerializers(vertexName, inSerializer, null, 
outSerializer, null);
 
@@ -251,9 +256,9 @@ public class StreamGraph extends StreamingPlan {
 
                addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName, parallelism);
 
-               addTypeSerializers(vertexName, new 
StreamRecordSerializer<IN1>(in1TypeInfo),
-                               new StreamRecordSerializer<IN2>(in2TypeInfo), 
new StreamRecordSerializer<OUT>(
-                                               outTypeInfo), null);
+               addTypeSerializers(vertexName, new 
StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
+                               new StreamRecordSerializer<IN2>(in2TypeInfo, 
executionConfig), new StreamRecordSerializer<OUT>(
+                                               outTypeInfo, executionConfig), 
null);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("CO-TASK: {}", vertexName);
@@ -399,7 +404,7 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
-               StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType);
+               StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType, executionConfig);
                typeSerializersOut1.put(id, serializer);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index aed4496..eff6026 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -319,7 +319,7 @@ public class DataStream<OUT> {
 
        private GroupedDataStream<OUT> groupBy(Keys<OUT> keys) {
                return new GroupedDataStream<OUT>(this, 
clean(KeySelectorUtil.getSelectorForKeys(keys,
-                               getType())));
+                               getType(), environment.getConfig())));
        }
 
        /**

Reply via email to