Repository: flink Updated Branches: refs/heads/master ae04025f1 -> 033c69f94
http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java index 2a97c60..3e9fde7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.test.javaApiOperators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; @@ -284,7 +284,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // partition and group data UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1); - partitionedDS.combineGroup(new FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { + partitionedDS.combineGroup(new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { @Override public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { int count = 0; @@ -334,7 +334,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { // partition and group data UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1); - partitionedDS.combineGroup(new FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { + partitionedDS.combineGroup(new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { @Override public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { int count = 0; @@ -372,21 +372,21 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { }); // all methods on DataSet - ds.combineGroup(new FlatCombineFunctionExample()) + ds.combineGroup(new GroupCombineFunctionExample()) .output(new DiscardingOutputFormat<Tuple1<String>>()); // all methods on UnsortedGrouping - ds.groupBy(0).combineGroup(new FlatCombineFunctionExample()) + ds.groupBy(0).combineGroup(new GroupCombineFunctionExample()) .output(new DiscardingOutputFormat<Tuple1<String>>()); // all methods on SortedGrouping - ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new FlatCombineFunctionExample()) + ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new GroupCombineFunctionExample()) .output(new DiscardingOutputFormat<Tuple1<String>>()); env.execute(); } - public static class FlatCombineFunctionExample implements FlatCombineFunction<Tuple1<String>, Tuple1<String>> { + public static class GroupCombineFunctionExample implements GroupCombineFunction<Tuple1<String>, Tuple1<String>> { @Override public void combine(Iterable<Tuple1<String>> values, Collector<Tuple1<String>> out) throws Exception { @@ -396,7 +396,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { } } - public static class ScalaFlatCombineFunctionExample implements FlatCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> { + public static class ScalaGroupCombineFunctionExample implements GroupCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> { @Override public void combine(Iterable<scala.Tuple1<String>> values, Collector<scala.Tuple1<String>> out) throws Exception { @@ -406,7 +406,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { } } - public static class IdentityFunction implements FlatCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, + public static class IdentityFunction implements GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { @Override @@ -510,7 +510,7 @@ public class GroupCombineITCase extends MultipleProgramsTestBase { } - public interface CombineAndReduceGroup <IN, INT, OUT> extends FlatCombineFunction<IN, INT>, GroupReduceFunction<INT, OUT> { + public interface CombineAndReduceGroup <IN, INT, OUT> extends GroupCombineFunction<IN, INT>, GroupReduceFunction<INT, OUT> { } public interface KvGroupReduce<K, V, INT, OUT> extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, Tuple2<K, OUT>> { http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala index ef484df..380b3bc 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala @@ -43,7 +43,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .map(str => Tuple1(str)) // all methods on DataSet - ds.combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample()) + ds.combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) @@ -51,7 +51,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa // all methods on UnsortedGrouping ds.groupBy(0) - .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample()) + .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0) @@ -60,7 +60,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa // all methods on SortedGrouping ds.groupBy(0).sortGroup(0, Order.ASCENDING) - .combineGroup(new GroupCombineITCase.ScalaFlatCombineFunctionExample()) + .combineGroup(new GroupCombineITCase.ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0).sortGroup(0, Order.ASCENDING)