http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java index ee23736..31707b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.drivers; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -52,7 +53,7 @@ public class AllGroupReduceDriverTest { MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get(); context.setDriverStrategy(DriverStrategy.ALL_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(new DiscardingOutputCollector<Tuple2<String, Integer>>()); AllGroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>> driver = new AllGroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>>(); @@ -75,12 +76,12 @@ public class AllGroupReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.ALL_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setUdf(new ConcatSumReducer()); @@ -115,12 +116,12 @@ public class AllGroupReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.ALL_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setUdf(new ConcatSumMutableReducer());
http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java index bfc9168..119ac5b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java @@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.drivers; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -52,7 +53,7 @@ public class AllReduceDriverTest { MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get(); context.setDriverStrategy(DriverStrategy.ALL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(new DiscardingOutputCollector<Tuple2<String, Integer>>()); AllReduceDriver<Tuple2<String, Integer>> driver = new AllReduceDriver<Tuple2<String,Integer>>(); @@ -76,12 +77,12 @@ public class AllReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.ALL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setUdf(new ConcatSumFirstReducer()); @@ -108,12 +109,12 @@ public class AllReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.ALL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setUdf(new ConcatSumSecondReducer()); @@ -150,12 +151,12 @@ public class AllReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.ALL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setUdf(new ConcatSumFirstMutableReducer()); @@ -181,12 +182,12 @@ public class AllReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.ALL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setUdf(new ConcatSumSecondMutableReducer()); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java index d249e9a..53ca6ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.drivers; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.functions.RichGroupReduceFunction; @@ -50,12 +51,12 @@ public class GroupReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get(); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); @@ -81,13 +82,13 @@ public class GroupReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setCollector(result); context.setComparator1(comparator); context.setUdf(new ConcatSumReducer()); @@ -117,13 +118,13 @@ public class GroupReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumMutableReducer()); @@ -153,13 +154,13 @@ public class GroupReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumMutableAccumulatingReducer()); @@ -195,13 +196,13 @@ public class GroupReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumMutableAccumulatingReducer()); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java index 44cbe16..7494847 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java @@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.drivers; import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.functions.RichReduceFunction; @@ -55,11 +56,11 @@ public class ReduceCombineDriverTest { MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get(); context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); @@ -89,13 +90,13 @@ public class ReduceCombineDriverTest { Collections.shuffle(data); TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumFirstReducer()); @@ -120,13 +121,13 @@ public class ReduceCombineDriverTest { Collections.shuffle(data); TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumSecondReducer()); @@ -159,13 +160,13 @@ public class ReduceCombineDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumFirstMutableReducer()); @@ -187,13 +188,13 @@ public class ReduceCombineDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_PARTIAL_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumSecondMutableReducer()); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java index ae4e54c..5d78835 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.drivers; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.functions.RichReduceFunction; @@ -50,11 +51,11 @@ public class ReduceDriverTest { TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); MutableObjectIterator<Tuple2<String, Integer>> input = EmptyMutableObjectIterator.get(); context.setDriverStrategy(DriverStrategy.SORTED_REDUCE); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); @@ -81,13 +82,13 @@ public class ReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumFirstReducer()); @@ -109,13 +110,13 @@ public class ReduceDriverTest { List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData(); TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<String, Integer>> input = new RegularToMutableObjectIterator<Tuple2<String, Integer>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumSecondReducer()); @@ -147,13 +148,13 @@ public class ReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumFirstMutableReducer()); @@ -174,13 +175,13 @@ public class ReduceDriverTest { List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData(); TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0)); - MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer()); - TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0); + MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer(new ExecutionConfig())); + TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true}, 0, new ExecutionConfig()); - GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer()); + GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer(new ExecutionConfig())); context.setDriverStrategy(DriverStrategy.SORTED_REDUCE); - context.setInput1(input, typeInfo.createSerializer()); + context.setInput1(input, typeInfo.createSerializer(new ExecutionConfig())); context.setComparator1(comparator); context.setCollector(result); context.setUdf(new ConcatSumSecondMutableReducer()); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index af7b008..c0083a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.sort; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -98,8 +99,8 @@ public class ExternalSortLargeRecordsITCase { final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); - final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); - final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(new ExecutionConfig()); + final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0, new ExecutionConfig()); MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() { @@ -168,8 +169,8 @@ public class ExternalSortLargeRecordsITCase { final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); - final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); - final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(new ExecutionConfig()); + final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0, new ExecutionConfig()); MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() { @@ -240,8 +241,8 @@ public class ExternalSortLargeRecordsITCase { final TupleTypeInfo<Tuple2<Long, SmallOrMediumOrLargeValue>> typeInfo = new TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types); - final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(); - final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(new ExecutionConfig()); + final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0, new ExecutionConfig()); MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() { @@ -323,8 +324,8 @@ public class ExternalSortLargeRecordsITCase { final TupleTypeInfo<Tuple2<Long, SmallOrMediumOrLargeValue>> typeInfo = new TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types); - final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(); - final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(new ExecutionConfig()); + final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0, new ExecutionConfig()); MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java index 3d237f7..498cb61 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -76,9 +77,9 @@ public class LargeRecordHandlerITCase { final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, Byte>> typeInfo = new TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types); - final TypeSerializer<Tuple3<Long, SomeVeryLongValue, Byte>> serializer = typeInfo.createSerializer(); + final TypeSerializer<Tuple3<Long, SomeVeryLongValue, Byte>> serializer = typeInfo.createSerializer(new ExecutionConfig()); final TypeComparator<Tuple3<Long, SomeVeryLongValue, Byte>> comparator = typeInfo.createComparator( - new int[] {2, 0}, new boolean[] {true, true}, 0); + new int[] {2, 0}, new boolean[] {true, true}, 0, new ExecutionConfig()); LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, Byte>> handler = new LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, Byte>>( serializer, comparator, ioMan, memMan, initialMemory, owner, 128); @@ -216,7 +217,7 @@ public class LargeRecordHandlerITCase { final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, Byte>> typeInfo = new TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types); - final TypeSerializer<Tuple3<Long, SomeVeryLongValue, Byte>> serializer = typeInfo.createSerializer(); + final TypeSerializer<Tuple3<Long, SomeVeryLongValue, Byte>> serializer = typeInfo.createSerializer(new ExecutionConfig()); channel = ioMan.createChannel(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java index d2abd62..6eb736f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.*; import java.util.List; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -55,9 +56,9 @@ public class LargeRecordHandlerTest { final TupleTypeInfo<Tuple2<Long, String>> typeInfo = (TupleTypeInfo<Tuple2<Long, String>>) TypeInfoParser.<Tuple2<Long, String>>parse("Tuple2<Long, String>"); - final TypeSerializer<Tuple2<Long, String>> serializer = typeInfo.createSerializer(); + final TypeSerializer<Tuple2<Long, String>> serializer = typeInfo.createSerializer(new ExecutionConfig()); final TypeComparator<Tuple2<Long, String>> comparator = typeInfo.createComparator( - new int[] {0}, new boolean[] {true}, 0); + new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); LargeRecordHandler<Tuple2<Long, String>> handler = new LargeRecordHandler<Tuple2<Long, String>>( serializer, comparator, ioMan, memMan, memory, owner, 128); @@ -109,9 +110,9 @@ public class LargeRecordHandlerTest { final TupleTypeInfo<Tuple2<Long, String>> typeInfo = (TupleTypeInfo<Tuple2<Long, String>>) TypeInfoParser.<Tuple2<Long, String>>parse("Tuple2<Long, String>"); - final TypeSerializer<Tuple2<Long, String>> serializer = typeInfo.createSerializer(); + final TypeSerializer<Tuple2<Long, String>> serializer = typeInfo.createSerializer(new ExecutionConfig()); final TypeComparator<Tuple2<Long, String>> comparator = typeInfo.createComparator( - new int[] {0}, new boolean[] {true}, 0); + new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); LargeRecordHandler<Tuple2<Long, String>> handler = new LargeRecordHandler<Tuple2<Long, String>>( serializer, comparator, ioMan, memMan, initialMemory, owner, 128); @@ -197,9 +198,9 @@ public class LargeRecordHandlerTest { final TupleTypeInfo<Tuple3<Long, String, Byte>> typeInfo = (TupleTypeInfo<Tuple3<Long, String, Byte>>) TypeInfoParser.<Tuple3<Long, String, Byte>>parse("Tuple3<Long, String, Byte>"); - final TypeSerializer<Tuple3<Long, String, Byte>> serializer = typeInfo.createSerializer(); + final TypeSerializer<Tuple3<Long, String, Byte>> serializer = typeInfo.createSerializer(new ExecutionConfig()); final TypeComparator<Tuple3<Long, String, Byte>> comparator = typeInfo.createComparator( - new int[] {2, 0}, new boolean[] {true, true}, 0); + new int[] {2, 0}, new boolean[] {true, true}, 0, new ExecutionConfig()); LargeRecordHandler<Tuple3<Long, String, Byte>> handler = new LargeRecordHandler<Tuple3<Long, String, Byte>>( serializer, comparator, ioMan, memMan, initialMemory, owner, 128); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java index 084da41..b96fdf2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java @@ -26,6 +26,7 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringComparator; @@ -175,8 +176,8 @@ public class MassiveStringSortingITCase { TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) (TupleTypeInfo<?>) TypeInfoParser.parse("Tuple2<String, String[]>"); - TypeSerializer<Tuple2<String, String[]>> serializer = typeInfo.createSerializer(); - TypeComparator<Tuple2<String, String[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0); + TypeSerializer<Tuple2<String, String[]>> serializer = typeInfo.createSerializer(new ExecutionConfig()); + TypeComparator<Tuple2<String, String[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0, new ExecutionConfig()); reader = new BufferedReader(new FileReader(input)); MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java index 9e925e8..5265cdb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java @@ -26,6 +26,7 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Random; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -177,8 +178,8 @@ public class MassiveStringValueSortingITCase { TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>) (TupleTypeInfo<?>) TypeInfoParser.parse("Tuple2<org.apache.flink.types.StringValue, org.apache.flink.types.StringValue[]>"); - TypeSerializer<Tuple2<StringValue, StringValue[]>> serializer = typeInfo.createSerializer(); - TypeComparator<Tuple2<StringValue, StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0); + TypeSerializer<Tuple2<StringValue, StringValue[]>> serializer = typeInfo.createSerializer(new ExecutionConfig()); + TypeComparator<Tuple2<StringValue, StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0, new ExecutionConfig()); reader = new BufferedReader(new FileReader(input)); MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java index d352817..7f468fa 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java @@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.aggregation.AggregationFunction; import org.apache.flink.api.java.aggregation.AggregationFunctionFactory; import org.apache.flink.api.java.aggregation.Aggregations; @@ -162,7 +162,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal genName.setLength(genName.length()-1); @SuppressWarnings("rawtypes") - RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType().createSerializer(), aggFunctions, fields); + RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType(), aggFunctions, fields); String name = getName() != null ? getName() : genName.toString(); @@ -240,12 +240,14 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal private TupleSerializerBase<T> serializer; - public AggregatingUdf(TypeSerializer<T> serializer, AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) { - Validate.notNull(serializer); + private TypeInformation<T> typeInfo; + + public AggregatingUdf(TypeInformation<T> typeInfo, AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) { + Validate.notNull(typeInfo); Validate.notNull(aggFunctions); Validate.isTrue(aggFunctions.length == fieldPositions.length); - Validate.isInstanceOf(TupleSerializerBase.class, serializer, "Serializer for Scala Aggregate Operator must be a tuple serializer."); - this.serializer = (TupleSerializerBase<T>) serializer; + Validate.isTrue(typeInfo.isTupleType(), "TypeInfo for Scala Aggregate Operator must be a tuple TypeInfo."); + this.typeInfo = typeInfo; this.aggFunctions = aggFunctions; this.fieldPositions = fieldPositions; } @@ -256,6 +258,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal for (AggregationFunction<Object> aggFunction : aggFunctions) { aggFunction.initializeAggregate(); } + this.serializer = (TupleSerializerBase<T>) typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java index 2ee1009..79c6659 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java @@ -22,6 +22,7 @@ package org.apache.flink.api.scala.operators; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.GenericCsvInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; @@ -69,7 +70,9 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor throw new UnsupportedOperationException("This only works on tuple types."); } TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo; - serializer = (TupleSerializerBase<OUT>)tupleType.createSerializer(); + // We can use an empty config here, since we only use the serializer to create + // the top-level case class + serializer = (TupleSerializerBase<OUT>) tupleType.createSerializer(new ExecutionConfig()); Class<?>[] classes = new Class[tupleType.getArity()]; for (int i = 0; i < tupleType.getArity(); i++) { @@ -214,7 +217,7 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor return null; } } - + if (parseRecord(parsedValues, bytes, offset, numBytes)) { OUT result = serializer.createInstance(parsedValues); return result; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java index 9eb4923..afcdc4a 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -219,7 +220,7 @@ public class ScalaCsvOutputFormat<T extends Product> extends FileOutputFormat<T> * is in fact a tuple type. */ @Override - public void setInputType(TypeInformation<?> type) { + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { if (!type.isTupleType()) { throw new InvalidProgramException("The " + ScalaCsvOutputFormat.class.getSimpleName() + " can only be used to write tuple data sets."); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala index 2e69efa..e57fd5b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala import org.apache.commons.lang3.Validate +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction} import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} import org.apache.flink.api.java.operators._ @@ -114,10 +115,10 @@ private[flink] object CrossDataSet { val returnType = new CaseClassTypeInfo[(L, R)]( classOf[(L, R)], Seq(leftInput.getType, rightInput.getType), Array("_1", "_2")) { - override def createSerializer: TypeSerializer[(L, R)] = { + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer + fieldSerializers(i) = types(i).createSerializer(executionConfig) } new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index e193770..7ec65c6 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -69,12 +69,6 @@ import scala.reflect.ClassTag * be created. */ class ExecutionEnvironment(javaEnv: JavaEnv) { - /** - * Sets the config object. - */ - def setConfig(config: ExecutionConfig): Unit = { - javaEnv.setConfig(config) - } /** * Gets the config object. @@ -411,7 +405,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { CollectionInputFormat.checkCollection(data.asJavaCollection, typeInfo.getTypeClass) val dataSource = new DataSource[T]( javaEnv, - new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer), + new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer(getConfig)), typeInfo, getCallLocationName()) wrap(dataSource) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala index 9f895fb..2b2e372 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.operators._ @@ -68,10 +69,11 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag]( val returnType = new CaseClassTypeInfo[(Array[L], Array[R])]( classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) { - override def createSerializer: TypeSerializer[(Array[L], Array[R])] = { + override def createSerializer( + executionConfig: ExecutionConfig): TypeSerializer[(Array[L], Array[R])] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer + fieldSerializers(i) = types(i).createSerializer(executionConfig) } new CaseClassSerializer[(Array[L], Array[R])]( http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala index c2e432f..e025192 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala @@ -272,8 +272,12 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C] def unapply(tpe: Type): Option[Type] = tpe match { case _ if tpe <:< typeOf[BitSet] => Some(typeOf[Int]) - case _ if tpe <:< typeOf[SortedMap[_, _]] => None - case _ if tpe <:< typeOf[SortedSet[_]] => None + case _ if tpe <:< typeOf[SortedMap[_, _]] => + // handled by generic serializer + None + case _ if tpe <:< typeOf[SortedSet[_]] => + // handled by generic serializer + None case _ if tpe <:< typeOf[TraversableOnce[_]] => // val traversable = tpe.baseClasses http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala index f6630a0..bafb7bf 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala @@ -19,9 +19,10 @@ package org.apache.flink.api.scala.codegen import java.lang.reflect.{Field, Modifier} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo._ -import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.types.Value @@ -98,10 +99,10 @@ private[flink] trait TypeInformationGen[C <: Context] { val fieldNamesExpr = c.Expr[Seq[String]](mkSeq(fieldNames)) reify { new CaseClassTypeInfo[T](tpeClazz.splice, fieldsExpr.splice, fieldNamesExpr.splice) { - override def createSerializer: TypeSerializer[T] = { + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer + fieldSerializers(i) = types(i).createSerializer(executionConfig) } new CaseClassSerializer[T](tupleType, fieldSerializers) { @@ -170,12 +171,13 @@ private[flink] trait TypeInformationGen[C <: Context] { import scala.collection.generic.CanBuildFrom import org.apache.flink.api.scala.typeutils.TraversableTypeInfo import org.apache.flink.api.scala.typeutils.TraversableSerializer + import org.apache.flink.api.common.ExecutionConfig val elementTpe = $elementTypeInfo new TraversableTypeInfo($collectionClass, elementTpe) { - def createSerializer() = { + def createSerializer(executionConfig: ExecutionConfig) = { new TraversableSerializer[${desc.tpe}, ${desc.elem.tpe}]( - elementTpe.createSerializer) { + elementTpe.createSerializer(executionConfig)) { def getCbf = implicitly[CanBuildFrom[${desc.tpe}, ${desc.elem.tpe}, ${desc.tpe}]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala index bff26cb..420496f 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.scala import org.apache.commons.lang3.Validate -import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.common.{ExecutionConfig, InvalidProgramException} import org.apache.flink.api.common.functions.{JoinFunction, RichFlatJoinFunction, FlatJoinFunction} import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint @@ -236,10 +236,10 @@ class UnfinishedJoinOperation[L, R]( val returnType = new CaseClassTypeInfo[(L, R)]( classOf[(L, R)], Seq(leftSet.getType, rightSet.getType), Array("_1", "_2")) { - override def createSerializer: TypeSerializer[(L, R)] = { + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(L, R)] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer + fieldSerializers(i) = types(i).createSerializer(executionConfig) } new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index b407332..1d627ab 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -20,14 +20,15 @@ package org.apache.flink.api.scala.typeutils import java.util.regex.{Pattern, Matcher} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.AtomicType import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor +import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.operators.Keys.ExpressionKeys -import org.apache.flink.api.java.typeutils.PojoTypeInfo.NamedFlatFieldDescriptor -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator} +import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, PojoTypeInfo} +import PojoTypeInfo.NamedFlatFieldDescriptor /** * TypeInformation for Case Classes. Creation and access is different from @@ -74,13 +75,13 @@ abstract class CaseClassTypeInfo[T <: Product]( comparatorHelperIndex += 1 } - override protected def getNewComparator: TypeComparator[T] = { + override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] = { val finalLogicalKeyFields = logicalKeyFields.take(comparatorHelperIndex) val finalComparators = fieldComparators.take(comparatorHelperIndex) val maxKey = finalLogicalKeyFields.max // create serializers only up to the last key, fields after that are not needed - val fieldSerializers = types.take(maxKey + 1).map(_.createSerializer) + val fieldSerializers = types.take(maxKey + 1).map(_.createSerializer(executionConfig)) new CaseClassComparator[T](finalLogicalKeyFields, finalComparators, fieldSerializers.toArray) } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala index ce19a65..e2d3388 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -36,12 +37,18 @@ class EitherTypeInfo[A, B, T <: Either[A, B]]( override def getArity: Int = 1 override def getTypeClass = clazz - def createSerializer(): TypeSerializer[T] = { - val leftSerializer = - if (leftTypeInfo != null) leftTypeInfo.createSerializer() else new NothingSerializer + def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = { + val leftSerializer = if (leftTypeInfo != null) { + leftTypeInfo.createSerializer(executionConfig) + } else { + new NothingSerializer + } - val rightSerializer = - if (rightTypeInfo != null) rightTypeInfo.createSerializer() else new NothingSerializer + val rightSerializer = if (rightTypeInfo != null) { + rightTypeInfo.createSerializer(executionConfig) + } else { + new NothingSerializer + } new EitherSerializer(leftSerializer, rightSerializer) } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala index 97fe7a7..4d39f7f 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -33,12 +34,13 @@ class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A]) override def getArity: Int = 1 override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]] - def createSerializer(): TypeSerializer[T] = { + def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = { if (elemTypeInfo == null) { // this happens when the type of a DataSet is None, i.e. DataSet[None] new OptionSerializer(new NothingSerializer).asInstanceOf[TypeSerializer[T]] } else { - new OptionSerializer(elemTypeInfo.createSerializer()).asInstanceOf[TypeSerializer[T]] + new OptionSerializer(elemTypeInfo.createSerializer(executionConfig)) + .asInstanceOf[TypeSerializer[T]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala index 06e40d8..96dc96d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -37,7 +38,7 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E]( override def getArity: Int = 1 override def getTypeClass: Class[T] = clazz - def createSerializer(): TypeSerializer[T] + def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] override def equals(other: Any): Boolean = { if (other.isInstanceOf[TraversableTypeInfo[_, _]]) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala index 1f565f2..5db1290 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.KryoSerializer import org.apache.flink.core.memory.{DataInputView, DataOutputView} @@ -26,12 +27,12 @@ import scala.util.{Success, Try, Failure} /** * Serializer for [[scala.util.Try]]. */ -class TrySerializer[A](val elemSerializer: TypeSerializer[A]) +class TrySerializer[A](val elemSerializer: TypeSerializer[A], executionConfig: ExecutionConfig) extends TypeSerializer[Try[A]] { override def duplicate: TrySerializer[A] = this - val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable]) + val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable], executionConfig) override def createInstance: Try[A] = { Failure(new RuntimeException("Empty Failure")) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala index b630cd4..d111260 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.KryoSerializer @@ -36,12 +37,13 @@ class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: TypeInformation[A]) override def getArity: Int = 1 override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]] - def createSerializer(): TypeSerializer[T] = { + def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = { if (elemTypeInfo == null) { // this happens when the type of a DataSet is None, i.e. DataSet[Failure] - new TrySerializer(new NothingSerializer).asInstanceOf[TypeSerializer[T]] + new TrySerializer(new NothingSerializer, executionConfig).asInstanceOf[TypeSerializer[T]] } else { - new TrySerializer(elemTypeInfo.createSerializer()).asInstanceOf[TypeSerializer[T]] + new TrySerializer(elemTypeInfo.createSerializer(executionConfig), executionConfig) + .asInstanceOf[TypeSerializer[T]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java index 376fd70..f00859f 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -25,6 +25,7 @@ import java.io.Serializable; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -103,7 +104,8 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> this.reporter = new HadoopDummyReporter(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); - this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass((Class<KEYIN>) inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>(); this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java index fd2c493..6943421 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java @@ -25,6 +25,7 @@ import java.io.Serializable; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -92,7 +93,8 @@ public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> this.reporter = new HadoopDummyReporter(); this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); - this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java index 5ecac2e..a063183 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java @@ -23,7 +23,6 @@ import java.util.Iterator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeExtractor; /** * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field. @@ -42,8 +41,8 @@ public class HadoopTupleUnwrappingIterator<KEY,VALUE> private KEY curKey = null; private VALUE firstValue = null; - public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) { - this.keySerializer = TypeExtractor.getForClass((Class<KEY>) keyClass).createSerializer(); + public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) { + this.keySerializer = keySerializer; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java index 2592b88..524318c 100644 --- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.NoSuchElementException; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.hadoop.io.IntWritable; import org.junit.Assert; @@ -33,7 +34,8 @@ public class HadoopTupleUnwrappingIteratorTest { public void testValueIterator() { HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = - new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(IntWritable.class); + new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(new WritableSerializer + <IntWritable>(IntWritable.class)); // many values http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index b5e43af..4800c96 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.compiler.plan.StreamingPlan; @@ -87,7 +88,11 @@ public class StreamGraph extends StreamingPlan { private Set<String> sources; - public StreamGraph() { + private ExecutionConfig executionConfig; + + public StreamGraph(ExecutionConfig executionConfig) { + + this.executionConfig = executionConfig; initGraph(); @@ -145,9 +150,9 @@ public class StreamGraph extends StreamingPlan { addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism); StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>( - inTypeInfo) : null; + inTypeInfo, executionConfig) : null; StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>( - outTypeInfo) : null; + outTypeInfo, executionConfig) : null; addTypeSerializers(vertexName, inSerializer, null, outSerializer, null); @@ -251,9 +256,9 @@ public class StreamGraph extends StreamingPlan { addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); - addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo), - new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>( - outTypeInfo), null); + addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig), + new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), new StreamRecordSerializer<OUT>( + outTypeInfo, executionConfig), null); if (LOG.isDebugEnabled()) { LOG.debug("CO-TASK: {}", vertexName); @@ -399,7 +404,7 @@ public class StreamGraph extends StreamingPlan { } public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { - StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); + StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType, executionConfig); typeSerializersOut1.put(id, serializer); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index aed4496..eff6026 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -319,7 +319,7 @@ public class DataStream<OUT> { private GroupedDataStream<OUT> groupBy(Keys<OUT> keys) { return new GroupedDataStream<OUT>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, - getType()))); + getType(), environment.getConfig()))); } /**