[FLINK-3198][dataSet] Renames and documents better the use of the getDataSet() in Grouping.
This closes #1548 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/902d420e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/902d420e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/902d420e Branch: refs/heads/master Commit: 902d420e1a2322fa5ef516716ff10837a6e36ce8 Parents: 5914e9a Author: Kostas Kloudas <kklou...@gmail.com> Authored: Mon Jan 25 16:07:39 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Jan 26 11:17:35 2016 +0100 ---------------------------------------------------------------------- .../api/java/operators/AggregateOperator.java | 6 ++--- .../java/operators/GroupCombineOperator.java | 2 +- .../api/java/operators/GroupReduceOperator.java | 2 +- .../flink/api/java/operators/Grouping.java | 20 +++++++++++---- .../api/java/operators/ReduceOperator.java | 2 +- .../api/java/operators/SortedGrouping.java | 25 +++++++++---------- .../api/java/operators/UnsortedGrouping.java | 26 ++++++++++---------- .../optimizer/UnionPropertyPropagationTest.java | 2 +- .../scala/operators/ScalaAggregateOperator.java | 6 ++--- .../BatchScalaAPICompletenessTest.scala | 2 +- 10 files changed, 51 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index 17dff69..00e0d3b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -94,16 +94,16 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate * @param field */ public AggregateOperator(Grouping<IN> input, Aggregations function, int field, String aggregateLocationName) { - super(Preconditions.checkNotNull(input).getDataSet(), input.getDataSet().getType()); + super(Preconditions.checkNotNull(input).getInputDataSet(), input.getInputDataSet().getType()); Preconditions.checkNotNull(function); this.aggregateLocationName = aggregateLocationName; - if (!input.getDataSet().getType().isTupleType()) { + if (!input.getInputDataSet().getType().isTupleType()) { throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types."); } - TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getDataSet().getType(); + TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getInputDataSet().getType(); if (field < 0 || field >= inType.getArity()) { throw new IllegalArgumentException("Aggregation field position is out of range."); http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java index 6d02eca..ef0c12f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java @@ -72,7 +72,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU * @param function The user-defined GroupReduce function. */ public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) { - super(input != null ? input.getDataSet() : null, resultType); + super(input != null ? input.getInputDataSet() : null, resultType); this.function = function; this.grouper = input; http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index 91f2efd..b1bf844 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -82,7 +82,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT * @param function The user-defined GroupReduce function. */ public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) { - super(input != null ? input.getDataSet() : null, resultType); + super(input != null ? input.getInputDataSet() : null, resultType); this.function = function; this.grouper = input; http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java index 59811b6..c117458 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java @@ -37,7 +37,7 @@ import org.apache.flink.api.java.DataSet; */ public abstract class Grouping<T> { - protected final DataSet<T> dataSet; + protected final DataSet<T> inputDataSet; protected final Keys<T> keys; @@ -53,13 +53,23 @@ public abstract class Grouping<T> { throw new InvalidProgramException("The grouping keys must not be empty."); } - this.dataSet = set; + this.inputDataSet = set; this.keys = keys; } - - public DataSet<T> getDataSet() { - return this.dataSet; + /** + * Returns the input DataSet of a grouping operation, that is the one before the grouping. This means that + * if it is applied directly to the result of a grouping operation, it will cancel its effect. As an example, in the + * following snippet: + * <pre><code> + * DataSet<X> notGrouped = input.groupBy().getDataSet(); + * DataSet<Y> allReduced = notGrouped.reduce() + * </pre></code> + * the <code>groupBy()</code> is as if it never happened, as the <code>notGrouped</code> DataSet corresponds + * to the input of the <code>groupBy()</code> (because of the <code>getDataset()</code>). + * */ + public DataSet<T> getInputDataSet() { + return this.inputDataSet; } public Keys<T> getKeys() { http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 6791741..6f8877f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -63,7 +63,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) { - super(input.getDataSet(), input.getDataSet().getType()); + super(input.getInputDataSet(), input.getInputDataSet().getType()); this.function = function; this.grouper = input; http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index f626e65..07d0b9a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -57,12 +57,12 @@ public class SortedGrouping<T> extends Grouping<T> { public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) { super(set, keys); - if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) { throw new InvalidProgramException("Selected sort key is not a sortable type"); } // use int-based expression key to properly resolve nested tuples for grouping - ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType()); this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); this.groupSortOrders = new Order[groupSortKeyPositions.length]; @@ -75,12 +75,12 @@ public class SortedGrouping<T> extends Grouping<T> { public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) { super(set, keys); - if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) { throw new InvalidProgramException("Selected sort key is not a sortable type"); } // resolve String-field to int using the expression keys - ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType()); this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); this.groupSortOrders = new Order[groupSortKeyPositions.length]; @@ -168,8 +168,8 @@ public class SortedGrouping<T> extends Grouping<T> { throw new NullPointerException("GroupReduce function must not be null."); } TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, - this.getDataSet().getType(), Utils.getCallLocationName(), true); - return new GroupReduceOperator<>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); + inputDataSet.getType(), Utils.getCallLocationName(), true); + return new GroupReduceOperator<>(this, resultType, inputDataSet.clean(reducer), Utils.getCallLocationName()); } /** @@ -189,9 +189,9 @@ public class SortedGrouping<T> extends Grouping<T> { throw new NullPointerException("GroupCombine function must not be null."); } TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, - this.getDataSet().getType(), Utils.getCallLocationName(), true); + this.getInputDataSet().getType(), Utils.getCallLocationName(), true); - return new GroupCombineOperator<>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName()); + return new GroupCombineOperator<>(this, resultType, inputDataSet.clean(combiner), Utils.getCallLocationName()); } @@ -228,11 +228,11 @@ public class SortedGrouping<T> extends Grouping<T> { if (groupSortSelectorFunctionKey != null) { throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported"); } - if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) { throw new InvalidProgramException("Selected sort key is not a sortable type"); } - ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType()); addSortGroupInternal(ek, order); return this; @@ -254,11 +254,11 @@ public class SortedGrouping<T> extends Grouping<T> { if (groupSortSelectorFunctionKey != null) { throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported"); } - if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) { throw new InvalidProgramException("Selected sort key is not a sortable type"); } - ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType()); addSortGroupInternal(ek, order); return this; @@ -278,5 +278,4 @@ public class SortedGrouping<T> extends Grouping<T> { this.groupSortOrders[pos] = order; // use the same order } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 6c2c271..5b0a368 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -136,7 +136,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { if (reducer == null) { throw new NullPointerException("Reduce function must not be null."); } - return new ReduceOperator<T>(this, dataSet.clean(reducer), Utils.getCallLocationName()); + return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName()); } /** @@ -157,9 +157,9 @@ public class UnsortedGrouping<T> extends Grouping<T> { throw new NullPointerException("GroupReduce function must not be null."); } TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, - this.getDataSet().getType(), Utils.getCallLocationName(), true); + this.getInputDataSet().getType(), Utils.getCallLocationName(), true); - return new GroupReduceOperator<T, R>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); + return new GroupReduceOperator<T, R>(this, resultType, inputDataSet.clean(reducer), Utils.getCallLocationName()); } /** @@ -179,9 +179,9 @@ public class UnsortedGrouping<T> extends Grouping<T> { throw new NullPointerException("GroupCombine function must not be null."); } TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, - this.getDataSet().getType(), Utils.getCallLocationName(), true); + this.getInputDataSet().getType(), Utils.getCallLocationName(), true); - return new GroupCombineOperator<T, R>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName()); + return new GroupCombineOperator<T, R>(this, resultType, inputDataSet.clean(combiner), Utils.getCallLocationName()); } /** @@ -210,12 +210,12 @@ public class UnsortedGrouping<T> extends Grouping<T> { public ReduceOperator<T> minBy(int... fields) { // Check for using a tuple - if(!this.dataSet.getType().isTupleType()) { + if(!this.inputDataSet.getType().isTupleType()) { throw new InvalidProgramException("Method minBy(int) only works on tuples."); } return new ReduceOperator<T>(this, new SelectByMinFunction( - (TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName()); + (TupleTypeInfo) this.inputDataSet.getType(), fields), Utils.getCallLocationName()); } /** @@ -231,12 +231,12 @@ public class UnsortedGrouping<T> extends Grouping<T> { public ReduceOperator<T> maxBy(int... fields) { // Check for using a tuple - if(!this.dataSet.getType().isTupleType()) { + if(!this.inputDataSet.getType().isTupleType()) { throw new InvalidProgramException("Method maxBy(int) only works on tuples."); } return new ReduceOperator<T>(this, new SelectByMaxFunction( - (TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName()); + (TupleTypeInfo) this.inputDataSet.getType(), fields), Utils.getCallLocationName()); } // -------------------------------------------------------------------------------------------- // Group Operations @@ -259,7 +259,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { throw new InvalidProgramException("KeySelector grouping keys and field index group-sorting keys cannot be used together."); } - SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, field, order); + SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, this.keys, field, order); sg.customPartitioner = getCustomPartitioner(); return sg; } @@ -280,7 +280,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { throw new InvalidProgramException("KeySelector grouping keys and field expression group-sorting keys cannot be used together."); } - SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, field, order); + SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, this.keys, field, order); sg.customPartitioner = getCustomPartitioner(); return sg; } @@ -301,8 +301,8 @@ public class UnsortedGrouping<T> extends Grouping<T> { throw new InvalidProgramException("KeySelector group-sorting keys can only be used with KeySelector grouping keys."); } - TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, this.dataSet.getType()); - SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, this.dataSet.getType(), keyType), order); + TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, this.inputDataSet.getType()); + SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, this.inputDataSet.getType(), keyType), order); sg.customPartitioner = getCustomPartitioner(); return sg; } http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java index c86f30a..fefc627 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java @@ -98,7 +98,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { final int NUM_INPUTS = 4; // construct the plan it will be multiple flat maps, all unioned - // and the "unioned" dataSet will be grouped + // and the "unioned" inputDataSet will be grouped final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> source = env.readTextFile(IN_FILE); http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java index 42f0e70..a79c843 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java @@ -95,15 +95,15 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal * @param field */ public ScalaAggregateOperator(Grouping<IN> input, Aggregations function, int field) { - super(Preconditions.checkNotNull(input).getDataSet(), input.getDataSet().getType()); + super(Preconditions.checkNotNull(input).getInputDataSet(), input.getInputDataSet().getType()); Preconditions.checkNotNull(function); - if (!input.getDataSet().getType().isTupleType()) { + if (!input.getInputDataSet().getType().isTupleType()) { throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types."); } - TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getDataSet().getType(); + TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getInputDataSet().getType(); if (field < 0 || field >= inType.getArity()) { throw new IllegalArgumentException("Aggregation field position is out of range."); http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala index d50186e..36ded06 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala @@ -45,7 +45,7 @@ class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.api.java.DataSet.getType", "org.apache.flink.api.java.operators.Operator.getResultType", "org.apache.flink.api.java.operators.Operator.getName", - "org.apache.flink.api.java.operators.Grouping.getDataSet", + "org.apache.flink.api.java.operators.Grouping.getInputDataSet", "org.apache.flink.api.java.operators.Grouping.getKeys", "org.apache.flink.api.java.operators.SingleInputOperator.getInput", "org.apache.flink.api.java.operators.SingleInputOperator.getInputType",