Repository: flink Updated Branches: refs/heads/master ba7a19c10 -> 2b16c6042
[FLINK-1112] Additional checks for KeySelector group sorting and minor fixes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b16c604 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b16c604 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b16c604 Branch: refs/heads/master Commit: 2b16c6042227145241046d497f9be0e43242c9fb Parents: f83db14 Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Dec 16 15:38:08 2014 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Jan 15 18:13:18 2015 +0100 ---------------------------------------------------------------------- .../GroupingKeySelectorTranslationTest.java | 35 +--- .../api/java/operators/SortedGrouping.java | 2 +- .../api/java/operators/UnsortedGrouping.java | 17 +- .../apache/flink/api/scala/GroupedDataSet.scala | 19 ++- .../javaApiOperators/GroupReduceITCase.java | 2 - ...tomPartitioningGroupingKeySelectorTest.scala | 162 +++++++++++-------- 6 files changed, 134 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java index 8f446a7..a7b1167 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java @@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase { data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>()) .withPartitioner(new TestPartitionerInt()) - .sortGroup(1, Order.ASCENDING) + .sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING) .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>()) .print(); @@ -137,39 +137,6 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase { } @Test - public void testCustomPartitioningKeySelectorGroupReduceSorted2() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0)) - .rebalance().setParallelism(4); - - data - .groupBy(new TestKeySelector<Tuple4<Integer,Integer,Integer,Integer>>()) - .withPartitioner(new TestPartitionerInt()) - .sortGroup(1, Order.ASCENDING) - .sortGroup(2, Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test public void testCustomPartitioningKeySelectorInvalidType() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 52a77a2..38c6c68 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -92,7 +92,7 @@ public class SortedGrouping<T> extends Grouping<T> { super(set, keys); if (!(this.keys instanceof Keys.SelectorFunctionKeys)) { - throw new InvalidProgramException("Sorting on KeySelector only works for KeySelector grouping."); + throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector grouping."); } this.groupSortKeyPositions = keySelector.computeLogicalKeyPositions(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 3f4839f..732c59b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -231,6 +231,10 @@ public class UnsortedGrouping<T> extends Grouping<T> { * @see Order */ public SortedGrouping<T> sortGroup(int field, Order order) { + if (this.getKeys() instanceof Keys.SelectorFunctionKeys) { + throw new InvalidProgramException("KeySelector grouping keys and field index group-sorting keys cannot be used together."); + } + SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, field, order); sg.customPartitioner = getCustomPartitioner(); return sg; @@ -248,6 +252,10 @@ public class UnsortedGrouping<T> extends Grouping<T> { * @see Order */ public SortedGrouping<T> sortGroup(String field, Order order) { + if (this.getKeys() instanceof Keys.SelectorFunctionKeys) { + throw new InvalidProgramException("KeySelector grouping keys and field expression group-sorting keys cannot be used together."); + } + SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, field, order); sg.customPartitioner = getCustomPartitioner(); return sg; @@ -256,7 +264,6 @@ public class UnsortedGrouping<T> extends Grouping<T> { /** * Sorts elements within a group on a key extracted by the specified {@link org.apache.flink.api.java.functions.KeySelector} * in the specified {@link Order}.</br> - * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/> * Chaining {@link #sortGroup(KeySelector, Order)} calls is not supported. * * @param keySelector The KeySelector with which the group is sorted. @@ -266,8 +273,14 @@ public class UnsortedGrouping<T> extends Grouping<T> { * @see Order */ public <K> SortedGrouping<T> sortGroup(KeySelector<T, K> keySelector, Order order) { + if (!(this.getKeys() instanceof Keys.SelectorFunctionKeys)) { + throw new InvalidProgramException("KeySelector group-sorting keys can only be used with KeySelector grouping keys."); + } + TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, this.dataSet.getType()); - return new SortedGrouping<T>(this.dataSet, this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, this.dataSet.getType(), keyType), order); + SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, this.dataSet.getType(), keyType), order); + sg.customPartitioner = getCustomPartitioner(); + return sg; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index 9fc055d..7ac8dcd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -67,6 +67,10 @@ class GroupedDataSet[T: ClassTag]( if (field >= set.getType.getArity) { throw new IllegalArgumentException("Order key out of tuple bounds.") } + if (keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) { + throw new InvalidProgramException("KeySelector grouping keys and field index group-sorting " + + "keys cannot be used together.") + } if (groupSortKeySelector.nonEmpty) { throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not " + "supported.") @@ -87,6 +91,10 @@ class GroupedDataSet[T: ClassTag]( throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not" + "supported.") } + if (keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) { + throw new InvalidProgramException("KeySelector grouping keys and field expression " + + "group-sorting keys cannot be used together.") + } groupSortKeyPositions += Right(field) groupSortOrders += order this @@ -103,6 +111,10 @@ class GroupedDataSet[T: ClassTag]( throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not" + "supported.") } + if (!keys.isInstanceOf[Keys.SelectorFunctionKeys[_, _]]) { + throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector " + + "grouping.") + } groupSortOrders += order val keyType = implicitly[TypeInformation[K]] @@ -121,7 +133,12 @@ class GroupedDataSet[T: ClassTag]( private def maybeCreateSortedGrouping(): Grouping[T] = { groupSortKeySelector match { case Some(keySelector) => - new SortedGrouping[T](set.javaSet, keys, keySelector, groupSortOrders(0)) + if (partitioner == null) { + new SortedGrouping[T](set.javaSet, keys, keySelector, groupSortOrders(0)) + } else { + new SortedGrouping[T](set.javaSet, keys, keySelector, groupSortOrders(0)) + .withPartitioner(partitioner) + } case None => if (groupSortKeyPositions.length > 0) { val grouping = groupSortKeyPositions(0) match { http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 29a2c46..6bef0bc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -835,7 +835,6 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { .reduceGroup(new Tuple3SortedGroupReduceWithCombine()); reduceDs.writeAsCsv(resultPath); - reduceDs.print(); env.execute(); // return expected result @@ -1302,7 +1301,6 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { int sum = 0; long key = 0; - System.out.println("im in"); StringBuilder concat = new StringBuilder(); for (Tuple3<Integer, Long, String> next : values) { http://git-wip-us.apache.org/repos/asf/flink/blob/2b16c604/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala index 93e3593..6b23649 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala @@ -25,32 +25,31 @@ import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.runtime.operators.shipping.ShipStrategyType import org.apache.flink.compiler.plan.SingleInputPlanNode import org.apache.flink.test.compiler.util.CompilerTestBase -import scala.collection.immutable.Seq import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.InvalidProgramException class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { - + @Test def testCustomPartitioningKeySelectorReduce() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0,0) ).rebalance().setParallelism(4) - + data - .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) - .reduce( (a,b) => a ) - .print() - + .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) + .reduce( (a,b) => a ) + .print() + val p = env.createProgramPlan() val op = compileNoStats(p) - + val sink = op.getDataSinks.iterator().next() val keyRemovingMapper = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode] val reducer = keyRemovingMapper.getInput.getSource.asInstanceOf[SingleInputPlanNode] val combiner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode] - + assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) assertEquals(ShipStrategyType.FORWARD, keyRemovingMapper.getInput.getShipStrategy) assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput.getShipStrategy) @@ -63,26 +62,27 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + @Test def testCustomPartitioningKeySelectorGroupReduce() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0,0) ).rebalance().setParallelism(4) - + data - .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) - .reduceGroup( iter => Seq(iter.next()) ) - .print() - + .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) + .reduce( (a, b) => a) + .print() + val p = env.createProgramPlan() val op = compileNoStats(p) - + val sink = op.getDataSinks.iterator().next() val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode] + .getInput.getSource.asInstanceOf[SingleInputPlanNode] val combiner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode] - + assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput.getShipStrategy) assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy) @@ -94,28 +94,63 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + + @Test + def testCustomPartitioningIndexGroupReduceSorted() { + try { + val env = ExecutionEnvironment.getExecutionEnvironment + + val data = env.fromElements( (0,0,0) ).rebalance().setParallelism(4) + + data + .groupBy(0) + .withPartitioner(new TestPartitionerInt()) + .sortGroup(1, Order.ASCENDING) + .reduce( (a,b) => a) + .print() + + val p = env.createProgramPlan() + val op = compileNoStats(p) + + val sink = op.getDataSinks.iterator().next() + val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode] + val combiner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode] + + assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput.getShipStrategy) + assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy) + + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } + @Test def testCustomPartitioningKeySelectorGroupReduceSorted() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0,0,0) ).rebalance().setParallelism(4) - + data - .groupBy( _._1 ) - .withPartitioner(new TestPartitionerInt()) - .sortGroup(1, Order.ASCENDING) - .reduceGroup( iter => Seq(iter.next()) ) - .print() - + .groupBy(_._1) + .withPartitioner(new TestPartitionerInt()) + .sortGroup(_._2, Order.ASCENDING) + .reduce( (a,b) => a) + .print() + val p = env.createProgramPlan() val op = compileNoStats(p) - + val sink = op.getDataSinks.iterator().next() val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode] + .getInput.getSource.asInstanceOf[SingleInputPlanNode] val combiner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode] - + assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput.getShipStrategy) assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy) @@ -127,31 +162,32 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + @Test def testCustomPartitioningKeySelectorGroupReduceSorted2() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0,0,0,0) ).rebalance().setParallelism(4) - + data - .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) - .sortGroup(1, Order.ASCENDING) - .sortGroup(2, Order.DESCENDING) - .reduceGroup( iter => Seq(iter.next()) ) - .print() - + .groupBy(0).withPartitioner(new TestPartitionerInt()) + .sortGroup(1, Order.ASCENDING) + .sortGroup(2, Order.DESCENDING) + .reduce( (a,b) => a) + .print() + val p = env.createProgramPlan() val op = compileNoStats(p) - + val sink = op.getDataSinks.iterator().next() val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode] val combiner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode] - + assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput.getShipStrategy) assertEquals(ShipStrategyType.FORWARD, combiner.getInput.getShipStrategy) + } catch { case e: Exception => { @@ -160,22 +196,22 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + @Test def testCustomPartitioningKeySelectorInvalidType() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0, 0) ).rebalance().setParallelism(4) - + try { data - .groupBy( _._1 ) - .withPartitioner(new TestPartitionerLong()) + .groupBy( _._1 ) + .withPartitioner(new TestPartitionerLong()) fail("Should throw an exception") } catch { - case e: InvalidProgramException => + case e: InvalidProgramException => } } catch { @@ -185,23 +221,23 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + @Test def testCustomPartitioningKeySelectorInvalidTypeSorted() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0, 0, 0) ).rebalance().setParallelism(4) - + try { data - .groupBy( _._1 ) - .sortGroup(1, Order.ASCENDING) - .withPartitioner(new TestPartitionerLong()) + .groupBy( _._1 ) + .sortGroup(1, Order.ASCENDING) + .withPartitioner(new TestPartitionerLong()) fail("Should throw an exception") } catch { - case e: InvalidProgramException => + case e: InvalidProgramException => } } catch { @@ -211,20 +247,20 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + @Test def testCustomPartitioningTupleRejectCompositeKey() { try { val env = ExecutionEnvironment.getExecutionEnvironment - + val data = env.fromElements( (0, 0, 0) ).rebalance().setParallelism(4) - + try { data.groupBy( v => (v._1, v._2) ).withPartitioner(new TestPartitionerInt()) fail("Should throw an exception") } catch { - case e: InvalidProgramException => + case e: InvalidProgramException => } } catch { @@ -234,16 +270,16 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { } } } - + // ---------------------------------------------------------------------------------------------- - + private class TestPartitionerInt extends Partitioner[Int] { - + override def partition(key: Int, numPartitions: Int): Int = 0 } - + private class TestPartitionerLong extends Partitioner[Long] { - + override def partition(key: Long, numPartitions: Int): Int = 0 } }