Repository: incubator-flink
Updated Branches:
  refs/heads/master eb51c8806 -> 7e396969f


Remove field0 from CoGroup and Join Operator

These were added before to force the user to specify at least one key
field. Now, the user is also forced to specify at least one key because
otherwise the call would be ambiguous because of tuple key fields and
expression keys.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7e396969
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7e396969
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7e396969

Branch: refs/heads/master
Commit: 7e396969f6020e8e73d5bf57d29a6ed80bbbdbd2
Parents: eb51c88
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Aug 13 12:49:37 2014 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Aug 13 14:41:25 2014 +0200

----------------------------------------------------------------------
 .../api/java/operators/CoGroupOperator.java     | 58 ++++++++------------
 .../flink/api/java/operators/JoinOperator.java  | 42 +++++---------
 2 files changed, 36 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7e396969/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index c1ec8c6..899fc09 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -301,7 +301,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
        /**
         * Intermediate step of a CoGroup transformation. <br/>
         * To continue the CoGroup transformation, select the grouping key of 
the first input {@link DataSet} by calling 
-        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(int,
 int...)} or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
+        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(int...)}
 or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}.
         *
         * @param <I1> The type of the first input DataSet of the CoGroup 
transformation.
         * @param <I2> The type of the second input DataSet of the CoGroup 
transformation.
@@ -325,19 +325,16 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * Defines the {@link Tuple} fields of the first co-grouped 
{@link DataSet} that should be used as grouping keys.<br/>
                 * <b>Note: Fields can only be selected as grouping keys on 
Tuple DataSets.</b><br/>
                 *
-                * @param field0 The first index of the Tuple fields of the 
first co-grouped DataSets that should be used as key
+                *
                 * @param fields The indexes of the Tuple fields of the first 
co-grouped DataSets that should be used as keys.
-                * @return An incomplete CoGroup transformation. 
-                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} to continue the CoGroup.
+                * @return An incomplete CoGroup transformation.
+                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int...)}
 to continue the CoGroup.
                 * 
                 * @see Tuple
                 * @see DataSet
                 */
-               public CoGroupOperatorSetsPredicate where(int field0, int... 
fields) {
-                       int[] actualFields = new int[fields.length + 1];
-                       actualFields[0] = field0;
-                       System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-                       return new CoGroupOperatorSetsPredicate(new 
Keys.FieldPositionKeys<I1>(actualFields, input1.getType()));
+               public CoGroupOperatorSetsPredicate where(int... fields) {
+                       return new CoGroupOperatorSetsPredicate(new 
Keys.FieldPositionKeys<I1>(fields, input1.getType()));
                }
 
                /**
@@ -345,19 +342,16 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * Defines the fields of the first co-grouped {@link DataSet} 
that should be used as grouping keys. Fields
                 * are the names of member fields of the underlying type of the 
data set.
                 *
-                * @param field0 The first field of the Tuple fields of the 
first co-grouped DataSets that should be used as key
+                *
                 * @param fields The  fields of the first co-grouped DataSets 
that should be used as keys.
                 * @return An incomplete CoGroup transformation.
-                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} to continue the CoGroup.
+                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int...)}
 to continue the CoGroup.
                 *
                 * @see Tuple
                 * @see DataSet
                 */
-//             public CoGroupOperatorSetsPredicate where(String field0, 
String... fields) {
-//                     String[] actualFields = new String[fields.length + 1];
-//                     actualFields[0] = field0;
-//                     System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-//                     return new CoGroupOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(actualFields, input1.getType()));
+//             public CoGroupOperatorSetsPredicate where(String... fields) {
+//                     return new CoGroupOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
 //             }
 
                /**
@@ -367,7 +361,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                 * 
                 * @param keyExtractor The KeySelector function which extracts 
the key values from the DataSet on which it is grouped.
                 * @return An incomplete CoGroup transformation. 
-                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} to continue the CoGroup. 
+                *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int...)}
 to continue the CoGroup.
                 * 
                 * @see KeySelector
                 * @see DataSet
@@ -381,7 +375,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                /**
                 * Intermediate step of a CoGroup transformation. <br/>
                 * To continue the CoGroup transformation, select the grouping 
key of the second input {@link DataSet} by calling 
-                * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int,
 int...)} or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
+                * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int...)}
 or {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}.
                 *
                 */
                public final class CoGroupOperatorSetsPredicate {
@@ -405,32 +399,26 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         * {@link DataSet} that should be used as grouping 
keys.<br/>
                         * <b>Note: Fields can only be selected as grouping 
keys on Tuple DataSets.</b><br/>
                         *
-                        * @param field0 The first index of the Tuple fields of 
the second co-grouped DataSets that should be used as key
+                        *
                         * @param fields The indexes of the Tuple fields of the 
second co-grouped DataSet that should be used as keys.
-                        * @return An incomplete CoGroup transformation. 
-                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}
 to finalize the CoGroup transformation.
+                        * @return An incomplete CoGroup transformation.
+                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
-                       public CoGroupOperatorWithoutFunction equalTo(int 
field0, int... fields) {
-                               int[] actualFields = new int[fields.length + 1];
-                               actualFields[0] = field0;
-                               System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-                               return createCoGroupOperator(new 
Keys.FieldPositionKeys<I2>(actualFields, input2.getType()));
+                       public CoGroupOperatorWithoutFunction equalTo(int... 
fields) {
+                               return createCoGroupOperator(new 
Keys.FieldPositionKeys<I2>(fields, input2.getType()));
                        }
 
                        /**
                         * Continues a CoGroup transformation and defines the 
fields of the second co-grouped
                         * {@link DataSet} that should be used as grouping 
keys.<br/>
                         *
-                        * @param field0 The first field of the second 
co-grouped DataSets that should be used as key
+                        *
                         * @param fields The  fields of the first co-grouped 
DataSets that should be used as keys.
                         * @return An incomplete CoGroup transformation.
-                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}
 to finalize the CoGroup transformation.
+                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
-//                     public CoGroupOperatorWithoutFunction equalTo(String 
field0, String... fields) {
-//                             String[] actualFields = new 
String[fields.length + 1];
-//                             actualFields[0] = field0;
-//                             System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-//                             return createCoGroupOperator(new 
Keys.ExpressionKeys<I2>(actualFields, input2.getType()));
+//                     public CoGroupOperatorWithoutFunction equalTo(String... 
fields) {
+//                             return createCoGroupOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
 //                     }
 
                        /**
@@ -440,7 +428,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         * 
                         * @param keyExtractor The KeySelector function which 
extracts the key values from the second DataSet on which it is grouped.
                         * @return An incomplete CoGroup transformation. 
-                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}
 to finalize the CoGroup transformation.
+                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
                        public <K> CoGroupOperatorWithoutFunction 
equalTo(KeySelector<I2, K> keyExtractor) {
                                return createCoGroupOperator(new 
Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType()));
@@ -449,7 +437,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                        /**
                         * Intermediate step of a CoGroup transformation. <br/>
                         * To continue the CoGroup transformation, provide a 
{@link org.apache.flink.api.java.functions.RichCoGroupFunction} by calling
-                        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}.
+                        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)}.
                         *
                         */
                        private CoGroupOperatorWithoutFunction 
createCoGroupOperator(Keys<I2> keys2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7e396969/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index f446193..6ffbd1b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -679,7 +679,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
        /**
         * Intermediate step of a Join transformation. <br/>
         * To continue the Join transformation, select the join key of the 
first input {@link DataSet} by calling 
-        * {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(int, 
int...)} or
+        * {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(int...)}
 or
         * {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(KeySelector)}.
         *
         * @param <I1> The type of the first input DataSet of the Join 
transformation.
@@ -711,21 +711,17 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 * Defines the {@link Tuple} fields of the first join {@link 
DataSet} that should be used as join keys.<br/>
                 * <b>Note: Fields can only be selected as join keys on Tuple 
DataSets.</b><br/>
                 *
-                * @param field0 The first index of the Tuple fields of the 
first join DataSets that should be used as key
                 * @param fields The indexes of the other Tuple fields of the 
first join DataSets that should be used as keys.
                 * @return An incomplete Join transformation. 
-                *           Call {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int,
 int...)} or
+                *           Call {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)}
 or
                 *           {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
                 *           to continue the Join. 
                 * 
                 * @see Tuple
                 * @see DataSet
                 */
-               public JoinOperatorSetsPredicate where(int field0, int... 
fields) {
-                       int[] actualFields = new int[fields.length + 1];
-                       actualFields[0] = field0;
-                       System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-                       return new JoinOperatorSetsPredicate(new 
Keys.FieldPositionKeys<I1>(actualFields, input1.getType()));
+               public JoinOperatorSetsPredicate where(int... fields) {
+                       return new JoinOperatorSetsPredicate(new 
Keys.FieldPositionKeys<I1>(fields, input1.getType()));
                }
 
                /**
@@ -733,21 +729,17 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 * Defines the fields of the first join {@link DataSet} that 
should be used as grouping keys. Fields
                 * are the names of member fields of the underlying type of the 
data set.
                 *
-                * @param field0 The first field of the Tuple fields of the 
first join DataSets that should be used as key
                 * @param fields The  fields of the first join DataSets that 
should be used as keys.
                 * @return An incomplete Join transformation.
-                *           Call {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int,
 int...)} or
+                *           Call {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)}
 or
                 *           {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
                 *           to continue the Join.
                 *
                 * @see Tuple
                 * @see DataSet
                 */
-//             public JoinOperatorSetsPredicate where(String field0, String... 
fields) {
-//                     String[] actualFields = new String[fields.length + 1];
-//                     actualFields[0] = field0;
-//                     System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-//                     return new JoinOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(actualFields, input1.getType()));
+//             public JoinOperatorSetsPredicate where(String... fields) {
+//                     return new JoinOperatorSetsPredicate(new 
Keys.ExpressionKeys<I1>(fields, input1.getType()));
 //             }
                
                /**
@@ -757,7 +749,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 * 
                 * @param keySelector The KeySelector function which extracts 
the key values from the DataSet on which it is joined.
                 * @return An incomplete Join transformation. 
-                *           Call {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int,
 int...)} or
+                *           Call {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)}
 or
                 *           {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
                 *           to continue the Join. 
                 * 
@@ -773,7 +765,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                /**
                 * Intermediate step of a Join transformation. <br/>
                 * To continue the Join transformation, select the join key of 
the second input {@link DataSet} by calling 
-                * {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int,
 int...)} or 
+                * {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)}
 or
                 * {@link 
org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}.
                 *
                 */
@@ -802,15 +794,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                         * the element of the first input being the first field 
of the tuple and the element of the 
                         * second input being the second field of the tuple. 
                         *
-                        * @param field0 The first field of the Tuple fields of 
the second join DataSets that should be used as key
                         * @param fields The indexes of the Tuple fields of the 
second join DataSet that should be used as keys.
                         * @return A DefaultJoin that represents the joined 
DataSet.
                         */
-                       public DefaultJoin<I1, I2> equalTo(int field0, int... 
fields) {
-                               int[] actualFields = new int[fields.length + 1];
-                               actualFields[0] = field0;
-                               System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-                               return createJoinOperator(new 
Keys.FieldPositionKeys<I2>(actualFields, input2.getType()));
+                       public DefaultJoin<I1, I2> equalTo(int... fields) {
+                               return createJoinOperator(new 
Keys.FieldPositionKeys<I2>(fields, input2.getType()));
                        }
 
                        /**
@@ -821,15 +809,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                         * the element of the first input being the first field 
of the tuple and the element of the
                         * second input being the second field of the tuple.
                         *
-                        * @param field0 The first field of the second join 
DataSets that should be used as key
                         * @param fields The fields of the second join DataSet 
that should be used as keys.
                         * @return A DefaultJoin that represents the joined 
DataSet.
                         */
-//                     public DefaultJoin<I1, I2> equalTo(String field0, 
String... fields) {
-//                             String[] actualFields = new 
String[fields.length + 1];
-//                             actualFields[0] = field0;
-//                             System.arraycopy(fields, 0, actualFields, 1, 
fields.length);
-//                             return createJoinOperator(new 
Keys.ExpressionKeys<I2>(actualFields, input2.getType()));
+//                     public DefaultJoin<I1, I2> equalTo(String... fields) {
+//                             return createJoinOperator(new 
Keys.ExpressionKeys<I2>(fields, input2.getType()));
 //                     }
 
                        /**

Reply via email to