[FLINK-3198][dataSet] Renames and documents better the use of the getDataSet() 
in Grouping.

This closes #1548


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/902d420e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/902d420e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/902d420e

Branch: refs/heads/master
Commit: 902d420e1a2322fa5ef516716ff10837a6e36ce8
Parents: 5914e9a
Author: Kostas Kloudas <kklou...@gmail.com>
Authored: Mon Jan 25 16:07:39 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Jan 26 11:17:35 2016 +0100

----------------------------------------------------------------------
 .../api/java/operators/AggregateOperator.java   |  6 ++---
 .../java/operators/GroupCombineOperator.java    |  2 +-
 .../api/java/operators/GroupReduceOperator.java |  2 +-
 .../flink/api/java/operators/Grouping.java      | 20 +++++++++++----
 .../api/java/operators/ReduceOperator.java      |  2 +-
 .../api/java/operators/SortedGrouping.java      | 25 +++++++++----------
 .../api/java/operators/UnsortedGrouping.java    | 26 ++++++++++----------
 .../optimizer/UnionPropertyPropagationTest.java |  2 +-
 .../scala/operators/ScalaAggregateOperator.java |  6 ++---
 .../BatchScalaAPICompletenessTest.scala         |  2 +-
 10 files changed, 51 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 17dff69..00e0d3b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -94,16 +94,16 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
         * @param field
         */
        public AggregateOperator(Grouping<IN> input, Aggregations function, int 
field, String aggregateLocationName) {
-               super(Preconditions.checkNotNull(input).getDataSet(), 
input.getDataSet().getType());
+               super(Preconditions.checkNotNull(input).getInputDataSet(), 
input.getInputDataSet().getType());
                Preconditions.checkNotNull(function);
                
                this.aggregateLocationName = aggregateLocationName;
                
-               if (!input.getDataSet().getType().isTupleType()) {
+               if (!input.getInputDataSet().getType().isTupleType()) {
                        throw new InvalidProgramException("Aggregating on field 
positions is only possible on tuple data types.");
                }
                
-               TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) 
input.getDataSet().getType();
+               TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) 
input.getInputDataSet().getType();
                
                if (field < 0 || field >= inType.getArity()) {
                        throw new IllegalArgumentException("Aggregation field 
position is out of range.");

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 6d02eca..ef0c12f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -72,7 +72,7 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
         * @param function The user-defined GroupReduce function.
         */
        public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> 
resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
-               super(input != null ? input.getDataSet() : null, resultType);
+               super(input != null ? input.getInputDataSet() : null, 
resultType);
 
                this.function = function;
                this.grouper = input;

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 91f2efd..b1bf844 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -82,7 +82,7 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
         * @param function The user-defined GroupReduce function.
         */
        public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> 
resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
-               super(input != null ? input.getDataSet() : null, resultType);
+               super(input != null ? input.getInputDataSet() : null, 
resultType);
                
                this.function = function;
                this.grouper = input;

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index 59811b6..c117458 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -37,7 +37,7 @@ import org.apache.flink.api.java.DataSet;
  */
 public abstract class Grouping<T> {
        
-       protected final DataSet<T> dataSet;
+       protected final DataSet<T> inputDataSet;
        
        protected final Keys<T> keys;
        
@@ -53,13 +53,23 @@ public abstract class Grouping<T> {
                        throw new InvalidProgramException("The grouping keys 
must not be empty.");
                }
 
-               this.dataSet = set;
+               this.inputDataSet = set;
                this.keys = keys;
        }
        
-       
-       public DataSet<T> getDataSet() {
-               return this.dataSet;
+       /**
+        * Returns the input DataSet of a grouping operation, that is the one 
before the grouping. This means that
+        * if it is applied directly to the result of a grouping operation, it 
will cancel its effect. As an example, in the
+        * following snippet:
+        * <pre><code>
+        * DataSet<X> notGrouped = input.groupBy().getDataSet();
+        * DataSet<Y> allReduced = notGrouped.reduce()
+        * </pre></code>
+        * the <code>groupBy()</code> is as if it never happened, as the 
<code>notGrouped</code> DataSet corresponds
+        * to the input of the <code>groupBy()</code> (because of the 
<code>getDataset()</code>).
+        * */
+       public DataSet<T> getInputDataSet() {
+               return this.inputDataSet;
        }
        
        public Keys<T> getKeys() {

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 6791741..6f8877f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -63,7 +63,7 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
        
        
        public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, 
String defaultName) {
-               super(input.getDataSet(), input.getDataSet().getType());
+               super(input.getInputDataSet(), 
input.getInputDataSet().getType());
                
                this.function = function;
                this.grouper = input;

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index f626e65..07d0b9a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -57,12 +57,12 @@ public class SortedGrouping<T> extends Grouping<T> {
        public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order 
order) {
                super(set, keys);
 
-               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+               if (!Keys.ExpressionKeys.isSortKey(field, 
inputDataSet.getType())) {
                        throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
 
                // use int-based expression key to properly resolve nested 
tuples for grouping
-               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
inputDataSet.getType());
 
                this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
                this.groupSortOrders = new Order[groupSortKeyPositions.length];
@@ -75,12 +75,12 @@ public class SortedGrouping<T> extends Grouping<T> {
        public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order 
order) {
                super(set, keys);
 
-               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+               if (!Keys.ExpressionKeys.isSortKey(field, 
inputDataSet.getType())) {
                        throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
 
                // resolve String-field to int using the expression keys
-               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
inputDataSet.getType());
 
                this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
                this.groupSortOrders = new Order[groupSortKeyPositions.length];
@@ -168,8 +168,8 @@ public class SortedGrouping<T> extends Grouping<T> {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }
                TypeInformation<R> resultType = 
TypeExtractor.getGroupReduceReturnTypes(reducer,
-                               this.getDataSet().getType(), 
Utils.getCallLocationName(), true);
-               return new GroupReduceOperator<>(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
+                               inputDataSet.getType(), 
Utils.getCallLocationName(), true);
+               return new GroupReduceOperator<>(this, resultType, 
inputDataSet.clean(reducer), Utils.getCallLocationName());
        }
 
        /**
@@ -189,9 +189,9 @@ public class SortedGrouping<T> extends Grouping<T> {
                        throw new NullPointerException("GroupCombine function 
must not be null.");
                }
                TypeInformation<R> resultType = 
TypeExtractor.getGroupCombineReturnTypes(combiner,
-                               this.getDataSet().getType(), 
Utils.getCallLocationName(), true);
+                               this.getInputDataSet().getType(), 
Utils.getCallLocationName(), true);
 
-               return new GroupCombineOperator<>(this, resultType, 
dataSet.clean(combiner), Utils.getCallLocationName());
+               return new GroupCombineOperator<>(this, resultType, 
inputDataSet.clean(combiner), Utils.getCallLocationName());
        }
 
        
@@ -228,11 +228,11 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (groupSortSelectorFunctionKey != null) {
                        throw new InvalidProgramException("Chaining sortGroup 
with KeySelector sorting is not supported");
                }
-               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+               if (!Keys.ExpressionKeys.isSortKey(field, 
inputDataSet.getType())) {
                        throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
 
-               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
inputDataSet.getType());
 
                addSortGroupInternal(ek, order);
                return this;
@@ -254,11 +254,11 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (groupSortSelectorFunctionKey != null) {
                        throw new InvalidProgramException("Chaining sortGroup 
with KeySelector sorting is not supported");
                }
-               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+               if (!Keys.ExpressionKeys.isSortKey(field, 
inputDataSet.getType())) {
                        throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
 
-               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
inputDataSet.getType());
 
                addSortGroupInternal(ek, order);
                return this;
@@ -278,5 +278,4 @@ public class SortedGrouping<T> extends Grouping<T> {
                        this.groupSortOrders[pos] = order; // use the same order
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 6c2c271..5b0a368 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -136,7 +136,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                if (reducer == null) {
                        throw new NullPointerException("Reduce function must 
not be null.");
                }
-               return new ReduceOperator<T>(this, dataSet.clean(reducer), 
Utils.getCallLocationName());
+               return new ReduceOperator<T>(this, inputDataSet.clean(reducer), 
Utils.getCallLocationName());
        }
        
        /**
@@ -157,9 +157,9 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }
                TypeInformation<R> resultType = 
TypeExtractor.getGroupReduceReturnTypes(reducer,
-                               this.getDataSet().getType(), 
Utils.getCallLocationName(), true);
+                               this.getInputDataSet().getType(), 
Utils.getCallLocationName(), true);
 
-               return new GroupReduceOperator<T, R>(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
+               return new GroupReduceOperator<T, R>(this, resultType, 
inputDataSet.clean(reducer), Utils.getCallLocationName());
        }
 
        /**
@@ -179,9 +179,9 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                        throw new NullPointerException("GroupCombine function 
must not be null.");
                }
                TypeInformation<R> resultType = 
TypeExtractor.getGroupCombineReturnTypes(combiner,
-                               this.getDataSet().getType(), 
Utils.getCallLocationName(), true);
+                               this.getInputDataSet().getType(), 
Utils.getCallLocationName(), true);
 
-               return new GroupCombineOperator<T, R>(this, resultType, 
dataSet.clean(combiner), Utils.getCallLocationName());
+               return new GroupCombineOperator<T, R>(this, resultType, 
inputDataSet.clean(combiner), Utils.getCallLocationName());
        }
 
        /**
@@ -210,12 +210,12 @@ public class UnsortedGrouping<T> extends Grouping<T> {
        public ReduceOperator<T> minBy(int... fields)  {
                
                // Check for using a tuple
-               if(!this.dataSet.getType().isTupleType()) {
+               if(!this.inputDataSet.getType().isTupleType()) {
                        throw new InvalidProgramException("Method minBy(int) 
only works on tuples.");
                }
                        
                return new ReduceOperator<T>(this, new SelectByMinFunction(
-                               (TupleTypeInfo) this.dataSet.getType(), 
fields), Utils.getCallLocationName());
+                               (TupleTypeInfo) this.inputDataSet.getType(), 
fields), Utils.getCallLocationName());
        }
        
        /**
@@ -231,12 +231,12 @@ public class UnsortedGrouping<T> extends Grouping<T> {
        public ReduceOperator<T> maxBy(int... fields)  {
                
                // Check for using a tuple
-               if(!this.dataSet.getType().isTupleType()) {
+               if(!this.inputDataSet.getType().isTupleType()) {
                        throw new InvalidProgramException("Method maxBy(int) 
only works on tuples.");
                }
                        
                return new ReduceOperator<T>(this, new SelectByMaxFunction(
-                               (TupleTypeInfo) this.dataSet.getType(), 
fields), Utils.getCallLocationName());
+                               (TupleTypeInfo) this.inputDataSet.getType(), 
fields), Utils.getCallLocationName());
        }
        // 
--------------------------------------------------------------------------------------------
        //  Group Operations
@@ -259,7 +259,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                        throw new InvalidProgramException("KeySelector grouping 
keys and field index group-sorting keys cannot be used together.");
                }
 
-               SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, 
this.keys, field, order);
+               SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, 
this.keys, field, order);
                sg.customPartitioner = getCustomPartitioner();
                return sg;
        }
@@ -280,7 +280,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                        throw new InvalidProgramException("KeySelector grouping 
keys and field expression group-sorting keys cannot be used together.");
                }
 
-               SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, 
this.keys, field, order);
+               SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, 
this.keys, field, order);
                sg.customPartitioner = getCustomPartitioner();
                return sg;
        }
@@ -301,8 +301,8 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                        throw new InvalidProgramException("KeySelector 
group-sorting keys can only be used with KeySelector grouping keys.");
                }
 
-               TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, this.dataSet.getType());
-               SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, 
this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, 
this.dataSet.getType(), keyType), order);
+               TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, this.inputDataSet.getType());
+               SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, 
this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, 
this.inputDataSet.getType(), keyType), order);
                sg.customPartitioner = getCustomPartitioner();
                return sg;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index c86f30a..fefc627 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -98,7 +98,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                final int NUM_INPUTS = 4;
                
                // construct the plan it will be multiple flat maps, all unioned
-               // and the "unioned" dataSet will be grouped
+               // and the "unioned" inputDataSet will be grouped
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                
                DataSet<String> source = env.readTextFile(IN_FILE);

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 42f0e70..a79c843 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -95,15 +95,15 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
         * @param field
         */
        public ScalaAggregateOperator(Grouping<IN> input, Aggregations 
function, int field) {
-               super(Preconditions.checkNotNull(input).getDataSet(), 
input.getDataSet().getType());
+               super(Preconditions.checkNotNull(input).getInputDataSet(), 
input.getInputDataSet().getType());
 
                Preconditions.checkNotNull(function);
 
-               if (!input.getDataSet().getType().isTupleType()) {
+               if (!input.getInputDataSet().getType().isTupleType()) {
                        throw new InvalidProgramException("Aggregating on field 
positions is only possible on tuple data types.");
                }
 
-               TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) 
input.getDataSet().getType();
+               TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) 
input.getInputDataSet().getType();
 
                if (field < 0 || field >= inType.getArity()) {
                        throw new IllegalArgumentException("Aggregation field 
position is out of range.");

http://git-wip-us.apache.org/repos/asf/flink/blob/902d420e/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
index d50186e..36ded06 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
@@ -45,7 +45,7 @@ class BatchScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
        "org.apache.flink.api.java.DataSet.getType",
        "org.apache.flink.api.java.operators.Operator.getResultType",
        "org.apache.flink.api.java.operators.Operator.getName",
-       "org.apache.flink.api.java.operators.Grouping.getDataSet",
+       "org.apache.flink.api.java.operators.Grouping.getInputDataSet",
        "org.apache.flink.api.java.operators.Grouping.getKeys",
        "org.apache.flink.api.java.operators.SingleInputOperator.getInput",
        "org.apache.flink.api.java.operators.SingleInputOperator.getInputType",

Reply via email to