[streaming] Streaming API grouping rework to use batch api Keys
Conflicts:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59952489
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59952489
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59952489
Branch: refs/heads/release-0.8
Commit: 59952489b0d9036e5ff91374a85760005ee16d63
Parents: c6b90ee
Author: Gyula Fora <[email protected]>
Authored: Tue Dec 23 21:16:46 2014 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Jan 5 18:04:17 2015 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 39 +++---
.../api/datastream/StreamJoinOperator.java | 28 +++--
.../api/function/co/JoinWindowFunction.java | 6 +-
.../streaming/util/keys/ArrayKeySelector.java | 45 -------
.../streaming/util/keys/FieldsKeySelector.java | 103 ---------------
.../streaming/util/keys/KeySelectorUtil.java | 126 +++++++++++++++++++
.../streaming/util/keys/ObjectKeySelector.java | 30 -----
.../streaming/util/keys/PojoKeySelector.java | 98 ---------------
.../streaming/util/keys/TupleKeySelector.java | 44 -------
.../streaming/api/AggregationFunctionTest.java | 39 +++---
.../streaming/api/WindowCrossJoinTest.java | 42 ++++---
.../operator/CoGroupedBatchReduceTest.java | 30 +++--
.../invokable/operator/CoGroupedReduceTest.java | 42 ++++++-
.../operator/CoGroupedWindowReduceTest.java | 36 ++++--
.../operator/GroupedReduceInvokableTest.java | 13 +-
.../operator/GroupedWindowInvokableTest.java | 50 ++++----
.../partitioner/FieldsPartitionerTest.java | 12 +-
.../streaming/util/FieldsKeySelectorTest.java | 61 ---------
.../scala/streaming/windowing/WindowJoin.scala | 1 -
.../scala/streaming/CaseClassKeySelector.scala | 45 -------
.../flink/api/scala/streaming/DataStream.scala | 18 +--
.../api/scala/streaming/FieldsKeySelector.scala | 38 ------
.../scala/streaming/StreamCrossOperator.scala | 7 +-
.../scala/streaming/StreamJoinOperator.scala | 30 +++--
.../scala/streaming/WindowedDataStream.scala | 9 +-
25 files changed, 377 insertions(+), 615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c4e3368..422df5b 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -33,6 +33,7 @@ import
org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -66,8 +67,7 @@ import
org.apache.flink.streaming.partitioner.DistributePartitioner;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
-import org.apache.flink.streaming.util.keys.PojoKeySelector;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -245,9 +245,11 @@ public class DataStream<OUT> {
* @return The grouped {@link DataStream}
*/
public GroupedDataStream<OUT> groupBy(int... fields) {
-
- return groupBy(FieldsKeySelector.getSelector(getType(),
fields));
-
+ if (getType() instanceof BasicArrayTypeInfo || getType()
instanceof PrimitiveArrayTypeInfo) {
+ return groupBy(new
KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+ } else {
+ return groupBy(new Keys.ExpressionKeys<OUT>(fields,
getType()));
+ }
}
/**
@@ -264,7 +266,7 @@ public class DataStream<OUT> {
**/
public GroupedDataStream<OUT> groupBy(String... fields) {
- return groupBy(new PojoKeySelector<OUT>(getType(), fields));
+ return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}
@@ -282,6 +284,11 @@ public class DataStream<OUT> {
return new GroupedDataStream<OUT>(this, clean(keySelector));
}
+ private GroupedDataStream<OUT> groupBy(Keys<OUT> keys) {
+ return new GroupedDataStream<OUT>(this,
clean(KeySelectorUtil.getSelectorForKeys(keys,
+ getType())));
+ }
+
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned by the selected fields. This setting only effects the
how the
@@ -293,9 +300,11 @@ public class DataStream<OUT> {
* @return The DataStream with fields partitioning set.
*/
public DataStream<OUT> partitionBy(int... fields) {
-
- return setConnectionType(new
FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
- getType(), fields)));
+ if (getType() instanceof BasicArrayTypeInfo || getType()
instanceof PrimitiveArrayTypeInfo) {
+ return partitionBy(new
KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+ } else {
+ return partitionBy(new Keys.ExpressionKeys<OUT>(fields,
getType()));
+ }
}
/**
@@ -309,9 +318,11 @@ public class DataStream<OUT> {
* @return The DataStream with fields partitioning set.
*/
public DataStream<OUT> partitionBy(String... fields) {
+ return partitionBy(new Keys.ExpressionKeys<OUT>(fields,
getType()));
+ }
- return setConnectionType(new FieldsPartitioner<OUT>(new
PojoKeySelector<OUT>(getType(),
- fields)));
+ private DataStream<OUT> partitionBy(Keys<OUT> keys) {
+ return partitionBy(KeySelectorUtil.getSelectorForKeys(keys,
getType()));
}
/**
@@ -411,7 +422,7 @@ public class DataStream<OUT> {
* the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is
to use
* output splitting to send a part of the closing data stream to the
head.
- * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
+ * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
* more information.
* <p>
* The iteration edge will be partitioned the same way as the first
input of
@@ -549,7 +560,7 @@ public class DataStream<OUT> {
* {@link StreamCrossOperator#onWindow} should be called to define the
* window.
* <p>
- * Call {@link StreamCrossOperator.CrossWindow#with(CrossFunction)} to
+ * Call {@link StreamCrossOperator.CrossWindow#with(crossFunction)} to
* define a custom cross function.
*
* @param dataStreamToCross
@@ -572,7 +583,7 @@ public class DataStream<OUT> {
* window, and then the {@link StreamJoinOperator.JoinWindow#where} and
* {@link StreamJoinOperator.JoinPredicate#equalTo} can be used to
define
* the join keys.</p> The user can also use the
- * {@link StreamJoinOperator.JoinedStream#with(JoinFunction)} to apply
+ * {@link StreamJoinOperator.JoinedStream#with(joinFunction)} to apply
* custom join function.
*
* @param other
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
index cbcc1b4..de15515 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
@@ -21,14 +21,14 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
-import org.apache.flink.streaming.util.keys.PojoKeySelector;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
public class StreamJoinOperator<I1, I2> extends
TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>>
{
@@ -45,9 +45,11 @@ public class StreamJoinOperator<I1, I2> extends
public static class JoinWindow<I1, I2> {
private StreamJoinOperator<I1, I2> op;
+ private TypeInformation<I1> type1;
private JoinWindow(StreamJoinOperator<I1, I2> operator) {
this.op = operator;
+ this.type1 = op.input1.getType();
}
/**
@@ -64,8 +66,8 @@ public class StreamJoinOperator<I1, I2> extends
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public JoinPredicate<I1, I2> where(int... fields) {
- return new JoinPredicate<I1, I2>(op,
FieldsKeySelector.getSelector(op.input1.getType(),
- fields));
+ return new JoinPredicate<I1, I2>(op,
KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<I1>(fields,
type1), type1));
}
/**
@@ -81,8 +83,8 @@ public class StreamJoinOperator<I1, I2> extends
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public JoinPredicate<I1, I2> where(String... fields) {
- return new JoinPredicate<I1, I2>(op, new
PojoKeySelector<I1>(op.input1.getType(),
- fields));
+ return new JoinPredicate<I1, I2>(op,
KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<I1>(fields,
type1), type1));
}
/**
@@ -114,13 +116,15 @@ public class StreamJoinOperator<I1, I2> extends
*/
public static class JoinPredicate<I1, I2> {
- public StreamJoinOperator<I1, I2> op;
- public KeySelector<I1, ?> keys1;
- public KeySelector<I2, ?> keys2;
+ private StreamJoinOperator<I1, I2> op;
+ private KeySelector<I1, ?> keys1;
+ private KeySelector<I2, ?> keys2;
+ private TypeInformation<I2> type2;
private JoinPredicate(StreamJoinOperator<I1, I2> operator,
KeySelector<I1, ?> keys1) {
this.op = operator;
this.keys1 = keys1;
+ this.type2 = op.input2.getType();
}
/**
@@ -138,7 +142,8 @@ public class StreamJoinOperator<I1, I2> extends
* apply a custom wrapping
*/
public JoinedStream<I1, I2> equalTo(int... fields) {
- keys2 =
FieldsKeySelector.getSelector(op.input2.getType(), fields);
+ keys2 = KeySelectorUtil.getSelectorForKeys(new
Keys.ExpressionKeys<I2>(fields, type2),
+ type2);
return createJoinOperator();
}
@@ -156,7 +161,8 @@ public class StreamJoinOperator<I1, I2> extends
* apply a custom wrapping
*/
public JoinedStream<I1, I2> equalTo(String... fields) {
- this.keys2 = new
PojoKeySelector<I2>(op.input2.getType(), fields);
+ this.keys2 = KeySelectorUtil.getSelectorForKeys(new
Keys.ExpressionKeys<I2>(fields,
+ type2), type2);
return createJoinOperator();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
index 9f5cd5d..9b39f33 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -41,8 +41,12 @@ public class JoinWindowFunction<IN1, IN2, OUT> implements
CoWindowFunction<IN1,
@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT>
out) throws Exception {
for (IN1 item1 : first) {
+ Object key1 = keySelector1.getKey(item1);
+
for (IN2 item2 : second) {
- if
(keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
+ Object key2 = keySelector2.getKey(item2);
+
+ if (key1.equals(key2)) {
out.collect(joinFunction.join(item1,
item2));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
deleted file mode 100644
index 6d6f620..0000000
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import java.lang.reflect.Array;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class ArrayKeySelector<IN> extends FieldsKeySelector<IN> {
-
- private static final long serialVersionUID = 1L;
-
- public ArrayKeySelector(int... fields) {
- super(fields);
- }
-
- @Override
- public Object getKey(IN value) throws Exception {
- if (simpleKey) {
- return Array.get(value, keyFields[0]);
- } else {
- int c = 0;
- for (int pos : keyFields) {
- ((Tuple) key).setField(Array.get(value, pos),
c);
- c++;
- }
- return key;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
deleted file mode 100644
index 171ddc9..0000000
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-
-public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object>
{
-
- private static final long serialVersionUID = 1L;
-
- protected int[] keyFields;
- protected Object key;
- protected boolean simpleKey;
-
- @SuppressWarnings("unchecked")
- public static Class<? extends Tuple>[] tupleClasses = new Class[] {
Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class,
Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class,
Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class,
Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class,
Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-
- public FieldsKeySelector(int... fields) {
- this.keyFields = fields;
- this.simpleKey = fields.length == 1;
- for (int i : fields) {
- if (i < 0) {
- throw new RuntimeException("Grouping fields
must be non-negative");
- }
- }
-
- try {
- key = tupleClasses[fields.length - 1].newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage());
- }
-
- }
-
- public static <R> KeySelector<R, ?> getSelector(TypeInformation<R>
type, int... fields) {
- if (type.isTupleType()) {
- return new TupleKeySelector<R>(fields);
- } else if (type instanceof BasicArrayTypeInfo || type
instanceof PrimitiveArrayTypeInfo) {
- return new ArrayKeySelector<R>(fields);
- } else {
- if (fields.length > 1) {
- throw new RuntimeException(
- "For non-tuple types use single
field 0 or KeyExctractor for grouping");
-
- } else if (fields[0] > 0) {
- throw new RuntimeException(
- "For simple objects grouping
only allowed on the first field");
- } else {
- return new ObjectKeySelector<R>();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
new file mode 100644
index 0000000..73e6360
--- /dev/null
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import java.lang.reflect.Array;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+
+public class KeySelectorUtil {
+
+ public static Class<?>[] tupleClasses = new Class[] { Tuple1.class,
Tuple2.class, Tuple3.class,
+ Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class,
Tuple8.class, Tuple9.class,
+ Tuple10.class, Tuple11.class, Tuple12.class,
Tuple13.class, Tuple14.class,
+ Tuple15.class, Tuple16.class, Tuple17.class,
Tuple18.class, Tuple19.class,
+ Tuple20.class, Tuple21.class, Tuple22.class,
Tuple23.class, Tuple24.class,
+ Tuple25.class };
+
+ public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys,
TypeInformation<X> typeInfo) {
+ int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+ int keyLength = logicalKeyPositions.length;
+ boolean[] orders = new boolean[keyLength];
+ TypeComparator<X> comparator = ((CompositeType<X>)
typeInfo).createComparator(
+ logicalKeyPositions, orders, 0);
+ return new ComparableKeySelector<X>(comparator, keyLength);
+ }
+
+ public static class ComparableKeySelector<IN> implements
KeySelector<IN, Tuple> {
+
+ private static final long serialVersionUID = 1L;
+
+ private TypeComparator<IN> comparator;
+ private int keyLength;
+ private Object[] keyArray;
+ private Tuple key;
+
+ public ComparableKeySelector(TypeComparator<IN> comparator, int
keyLength) {
+ this.comparator = comparator;
+ this.keyLength = keyLength;
+ keyArray = new Object[keyLength];
+ try {
+ key = (Tuple) tupleClasses[keyLength -
1].newInstance();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public Tuple getKey(IN value) throws Exception {
+ comparator.extractKeys(value, keyArray, 0);
+ for (int i = 0; i < keyLength; i++) {
+ key.setField(keyArray[i], i);
+ }
+ return key;
+ }
+
+ }
+
+ public static class ArrayKeySelector<IN> implements KeySelector<IN,
Tuple> {
+
+ private static final long serialVersionUID = 1L;
+
+ Tuple key;
+ int[] fields;
+
+ public ArrayKeySelector(int... fields) {
+ this.fields = fields;
+ try {
+ key = (Tuple) tupleClasses[fields.length -
1].newInstance();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public Tuple getKey(IN value) throws Exception {
+ for (int i = 0; i < fields.length; i++) {
+ int pos = fields[i];
+ key.setField(Array.get(value, fields[pos]), i);
+ }
+ return key;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
deleted file mode 100644
index cf4ecc6..0000000
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class ObjectKeySelector<IN> implements KeySelector<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IN getKey(IN value) throws Exception {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
deleted file mode 100644
index 618f14f..0000000
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-
-public class PojoKeySelector<IN> extends FieldsKeySelector<IN> {
-
- private static final long serialVersionUID = 1L;
-
- PojoComparator<IN> comparator;
-
- public PojoKeySelector(TypeInformation<IN> type, String... fields) {
- super(new int[removeDuplicates(fields).length]);
- if (!(type instanceof CompositeType<?>)) {
- throw new IllegalArgumentException(
- "Key expressions are only supported on
POJO types and Tuples. "
- + "A type is considered
a POJO if all its fields are public, or have both getters and setters defined");
- }
- CompositeType<IN> cType = (CompositeType<IN>) type;
-
- String[] keyFields = removeDuplicates(fields);
- int numOfKeys = keyFields.length;
-
- List<FlatFieldDescriptor> fieldDescriptors = new
ArrayList<FlatFieldDescriptor>();
- for (String field : keyFields) {
- cType.getKey(field, 0, fieldDescriptors);
- }
-
- int[] logicalKeyPositions = new int[numOfKeys];
- boolean[] orders = new boolean[numOfKeys];
-
- for (int i = 0; i < numOfKeys; i++) {
- logicalKeyPositions[i] =
fieldDescriptors.get(i).getPosition();
- }
-
- if (cType instanceof PojoTypeInfo) {
- comparator = (PojoComparator<IN>) cType
- .createComparator(logicalKeyPositions,
orders, 0);
- } else {
- throw new IllegalArgumentException(
- "Key expressions are only supported on
POJO types. "
- + "A type is considered
a POJO if all its fields are public, or have both getters and setters defined");
- }
-
- }
-
- @Override
- public Object getKey(IN value) throws Exception {
-
- Field[] keyFields = comparator.getKeyFields();
- if (simpleKey) {
- return comparator.accessField(keyFields[0], value);
- } else {
- int c = 0;
- for (Field field : keyFields) {
- ((Tuple)
key).setField(comparator.accessField(field, value), c);
- c++;
- }
- }
- return key;
- }
-
- private static String[] removeDuplicates(String[] in) {
- List<String> ret = new LinkedList<String>();
- for (String el : in) {
- if (!ret.contains(el)) {
- ret.add(el);
- }
- }
- return ret.toArray(new String[ret.size()]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
deleted file mode 100644
index 4ca64ef..0000000
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class TupleKeySelector<IN> extends FieldsKeySelector<IN> {
-
- private static final long serialVersionUID = 1L;
-
- public TupleKeySelector(int... fields) {
- super(fields);
- }
-
- @Override
- public Object getKey(IN value) throws Exception {
- if (simpleKey) {
- return ((Tuple) value).getField(keyFields[0]);
- } else {
- int c = 0;
- for (int pos : keyFields) {
- ((Tuple) key).setField(((Tuple)
value).getField(pos), c);
- c++;
- }
- return key;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 9376166..49cd497 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +36,7 @@ import
org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import
org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
public class AggregationFunctionTest {
@@ -94,14 +96,14 @@ public class AggregationFunctionTest {
Integer.class, type1);
ReduceFunction<Integer> sumFunction0 = SumAggregator
.getSumFunction(0, Integer.class, type2);
- ReduceFunction<Tuple2<Integer, Integer>> minFunction =
ComparableAggregator
- .getAggregator(1, type1, AggregationType.MIN);
- ReduceFunction<Integer> minFunction0 =
ComparableAggregator.getAggregator(0,
- type2, AggregationType.MIN);
- ReduceFunction<Tuple2<Integer, Integer>> maxFunction =
ComparableAggregator
- .getAggregator(1, type1, AggregationType.MAX);
- ReduceFunction<Integer> maxFunction0 =
ComparableAggregator.getAggregator(0,
- type2, AggregationType.MAX);
+ ReduceFunction<Tuple2<Integer, Integer>> minFunction =
ComparableAggregator.getAggregator(
+ 1, type1, AggregationType.MIN);
+ ReduceFunction<Integer> minFunction0 =
ComparableAggregator.getAggregator(0, type2,
+ AggregationType.MIN);
+ ReduceFunction<Tuple2<Integer, Integer>> maxFunction =
ComparableAggregator.getAggregator(
+ 1, type1, AggregationType.MAX);
+ ReduceFunction<Integer> maxFunction0 =
ComparableAggregator.getAggregator(0, type2,
+ AggregationType.MAX);
List<Tuple2<Integer, Integer>> sumList =
MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer,
Integer>>(sumFunction), getInputList());
@@ -111,17 +113,24 @@ public class AggregationFunctionTest {
List<Tuple2<Integer, Integer>> maxList =
MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer,
Integer>>(maxFunction), getInputList());
+ TypeInformation<Tuple2<Integer, Integer>> typeInfo =
TypeExtractor
+ .getForObject(new Tuple2<Integer, Integer>(1,
1));
+
+ KeySelector<Tuple2<Integer, Integer>, ?> keySelector =
KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<Tuple2<Integer,
Integer>>(new int[] { 0 }, typeInfo),
+ typeInfo);
+
List<Tuple2<Integer, Integer>> groupedSumList =
MockContext.createAndExecute(
- new GroupedReduceInvokable<Tuple2<Integer,
Integer>>(sumFunction,
- new
TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
+ new GroupedReduceInvokable<Tuple2<Integer,
Integer>>(sumFunction, keySelector),
+ getInputList());
List<Tuple2<Integer, Integer>> groupedMinList =
MockContext.createAndExecute(
- new GroupedReduceInvokable<Tuple2<Integer,
Integer>>(minFunction,
- new
TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
+ new GroupedReduceInvokable<Tuple2<Integer,
Integer>>(minFunction, keySelector),
+ getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList =
MockContext.createAndExecute(
- new GroupedReduceInvokable<Tuple2<Integer,
Integer>>(maxFunction,
- new
TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
+ new GroupedReduceInvokable<Tuple2<Integer,
Integer>>(maxFunction, keySelector),
+ getInputList());
assertEquals(expectedSumList, sumList);
assertEquals(expectedMinList, minList);
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index d8cdfa5..b71bb25 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
@@ -49,16 +50,16 @@ public class WindowCrossJoinTest implements Serializable {
env.setBufferTimeout(1);
ArrayList<Tuple2<Integer, String>> in1 = new
ArrayList<Tuple2<Integer, String>>();
- ArrayList<Integer> in2 = new ArrayList<Integer>();
+ ArrayList<Tuple1<Integer>> in2 = new
ArrayList<Tuple1<Integer>>();
in1.add(new Tuple2<Integer, String>(10, "a"));
in1.add(new Tuple2<Integer, String>(20, "b"));
in1.add(new Tuple2<Integer, String>(20, "x"));
in1.add(new Tuple2<Integer, String>(0, "y"));
- in2.add(0);
- in2.add(5);
- in2.add(20);
+ in2.add(new Tuple1<Integer>(0));
+ in2.add(new Tuple1<Integer>(5));
+ in2.add(new Tuple1<Integer>(20));
joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>,
Integer>(
new Tuple2<Integer, String>(20, "b"), 20));
@@ -93,23 +94,24 @@ public class WindowCrossJoinTest implements Serializable {
new Tuple2<Integer, String>(0, "y"), 20));
DataStream<Tuple2<Integer, String>> inStream1 =
env.fromCollection(in1);
- DataStream<Integer> inStream2 = env.fromCollection(in2);
+ DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
inStream1.join(inStream2).onWindow(1000, 1000, new
MyTimestamp1(), new MyTimestamp2())
.where(0).equalTo(0).addSink(new
JoinResultSink());
- inStream1.cross(inStream2).onWindow(1000, 1000, new
MyTimestamp1(), new MyTimestamp2())
- .with(new CrossFunction<Tuple2<Integer,String>,
Integer, Tuple2<Tuple2<Integer,String>, Integer>>() {
+ inStream1
+ .cross(inStream2)
+ .onWindow(1000, 1000, new MyTimestamp1(), new
MyTimestamp2())
+ .with(new CrossFunction<Tuple2<Integer,
String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
private static final long
serialVersionUID = 1L;
@Override
- public Tuple2<Tuple2<Integer, String>,
Integer> cross(
- Tuple2<Integer, String>
val1, Integer val2) throws Exception {
- return new
Tuple2<Tuple2<Integer,String>, Integer>(val1, val2);
+ public Tuple2<Tuple2<Integer, String>,
Tuple1<Integer>> cross(
+ Tuple2<Integer, String>
val1, Tuple1<Integer> val2) throws Exception {
+ return new
Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
}
- })
- .addSink(new CrossResultSink());
+ }).addSink(new CrossResultSink());
env.executeTest(MEMORYSIZE);
@@ -131,11 +133,11 @@ public class WindowCrossJoinTest implements Serializable {
}
}
- private static class MyTimestamp2 implements TimeStamp<Integer> {
+ private static class MyTimestamp2 implements TimeStamp<Tuple1<Integer>>
{
private static final long serialVersionUID = 1L;
@Override
- public long getTimestamp(Integer value) {
+ public long getTimestamp(Tuple1<Integer> value) {
return 101L;
}
@@ -146,22 +148,22 @@ public class WindowCrossJoinTest implements Serializable {
}
private static class JoinResultSink implements
- SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+ SinkFunction<Tuple2<Tuple2<Integer, String>,
Tuple1<Integer>>> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple2<Tuple2<Integer, String>, Integer>
value) {
- joinResults.add(value);
+ public void invoke(Tuple2<Tuple2<Integer, String>,
Tuple1<Integer>> value) {
+ joinResults.add(new Tuple2<Tuple2<Integer, String>,
Integer>(value.f0, value.f1.f0));
}
}
private static class CrossResultSink implements
- SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+ SinkFunction<Tuple2<Tuple2<Integer, String>,
Tuple1<Integer>>> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple2<Tuple2<Integer, String>, Integer>
value) {
- crossResults.add(value);
+ public void invoke(Tuple2<Tuple2<Integer, String>,
Tuple1<Integer>> value) {
+ crossResults.add(new Tuple2<Tuple2<Integer, String>,
Integer>(value.f0, value.f1.f0));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index bc19a89..1d0732c 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -23,15 +23,35 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class CoGroupedBatchReduceTest {
+ KeySelector<Tuple2<String, String>, ?> keySelector1 = new
KeySelector<Tuple2<String, String>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, String> value) throws
Exception {
+ return value.f0;
+ }
+ };
+
+ KeySelector<Tuple2<String, Integer>, ?> keySelector2 = new
KeySelector<Tuple2<String, Integer>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws
Exception {
+ return value.f0;
+ }
+ };
+
private static class MyCoReduceFunction implements
CoReduceFunction<Tuple2<String, Integer>,
Tuple2<String, String>, String> {
private static final long serialVersionUID = 1L;
@@ -59,7 +79,6 @@ public class CoGroupedBatchReduceTest {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void coGroupedBatchReduceTest1() {
@@ -96,8 +115,7 @@ public class CoGroupedBatchReduceTest {
expected.add("h");
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String> invokable = new
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String>(
- new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new
TupleKeySelector(0),
- new TupleKeySelector(0));
+ new MyCoReduceFunction(), 4L, 3L, 4L, 3L,
keySelector2, keySelector1);
List<String> result = MockCoContext.createAndExecute(invokable,
inputs1, inputs2);
@@ -106,7 +124,6 @@ public class CoGroupedBatchReduceTest {
assertEquals(expected, result);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void coGroupedBatchReduceTest2() {
@@ -143,8 +160,7 @@ public class CoGroupedBatchReduceTest {
expected.add("fh");
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String> invokable = new
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String>(
- new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new
TupleKeySelector(0),
- new TupleKeySelector(0));
+ new MyCoReduceFunction(), 4L, 3L, 2L, 2L,
keySelector2, keySelector1);
List<String> result = MockCoContext.createAndExecute(invokable,
inputs1, inputs2);
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
index 15d42a4..273bbae 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -22,12 +22,12 @@ import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class CoGroupedReduceTest {
@@ -59,7 +59,7 @@ public class CoGroupedReduceTest {
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings("unchecked")
@Test
public void coGroupedReduceTest() {
Tuple3<String, String, String> word1 = new Tuple3<String,
String, String>("a", "word1", "b");
@@ -71,8 +71,38 @@ public class CoGroupedReduceTest {
Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2,
4);
Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1,
5);
+ KeySelector<Tuple3<String, String, String>, ?> keySelector0 =
new KeySelector<Tuple3<String, String, String>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple3<String, String, String>
value) throws Exception {
+ return value.f0;
+ }
+ };
+
+ KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new
KeySelector<Tuple2<Integer, Integer>, Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Integer> value)
throws Exception {
+ return value.f0;
+ }
+ };
+
+ KeySelector<Tuple3<String, String, String>, ?> keySelector2 =
new KeySelector<Tuple3<String, String, String>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple3<String, String, String>
value) throws Exception {
+ return value.f2;
+ }
+ };
+
CoGroupedReduceInvokable<Tuple3<String, String, String>,
Tuple2<Integer, Integer>, String> invokable = new
CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer,
Integer>, String>(
- new MyCoReduceFunction(), new
TupleKeySelector(0), new TupleKeySelector(0));
+ new MyCoReduceFunction(), keySelector0,
keySelector1);
List<String> expected = Arrays.asList("word1", "1", "word2",
"2", "word1word3", "3", "5",
"7");
@@ -83,12 +113,12 @@ public class CoGroupedReduceTest {
assertEquals(expected, actualList);
invokable = new CoGroupedReduceInvokable<Tuple3<String, String,
String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), new
TupleKeySelector(2), new TupleKeySelector(0));
+ new MyCoReduceFunction(), keySelector2,
keySelector1);
expected = Arrays.asList("word1", "1", "word2", "2",
"word2word3", "3", "5", "7");
- actualList = MockCoContext.createAndExecute(invokable,
- Arrays.asList(word1, word2, word3),
Arrays.asList(int1, int2, int3, int4, int5));
+ actualList = MockCoContext.createAndExecute(invokable,
Arrays.asList(word1, word2, word3),
+ Arrays.asList(int1, int2, int3, int4, int5));
assertEquals(expected, actualList);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index b5a7e8d..e3f2a1b 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -25,16 +25,36 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class CoGroupedWindowReduceTest {
+ KeySelector<Tuple2<String, Integer>, ?> keySelector0 = new
KeySelector<Tuple2<String, Integer>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws
Exception {
+ return value.f0;
+ }
+ };
+
+ KeySelector<Tuple2<String, String>, ?> keySelector1 = new
KeySelector<Tuple2<String, String>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, String> value) throws
Exception {
+ return value.f0;
+ }
+ };
+
private static class MyCoReduceFunction implements
CoReduceFunction<Tuple2<String, Integer>,
Tuple2<String, String>, String> {
private static final long serialVersionUID = 1L;
@@ -85,7 +105,6 @@ public class CoGroupedWindowReduceTest {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void coGroupedWindowReduceTest1() {
@@ -125,9 +144,9 @@ public class CoGroupedWindowReduceTest {
expected.add("i");
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String> invokable = new
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String>(
- new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new
TupleKeySelector(0),
- new TupleKeySelector( 0), new
MyTimeStamp<Tuple2<String, Integer>>(
- timestamps1), new
MyTimeStamp<Tuple2<String, String>>(timestamps2));
+ new MyCoReduceFunction(), 4L, 3L, 4L, 3L,
keySelector0, keySelector1,
+ new MyTimeStamp<Tuple2<String,
Integer>>(timestamps1),
+ new MyTimeStamp<Tuple2<String,
String>>(timestamps2));
List<String> result = MockCoContext.createAndExecute(invokable,
inputs1, inputs2);
@@ -136,7 +155,6 @@ public class CoGroupedWindowReduceTest {
assertEquals(expected, result);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void coGroupedWindowReduceTest2() {
@@ -178,9 +196,9 @@ public class CoGroupedWindowReduceTest {
expected.add("fh");
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String> invokable = new
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String>(
- new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new
TupleKeySelector( 0),
- new TupleKeySelector( 0), new
MyTimeStamp<Tuple2<String, Integer>>(
- timestamps1), new
MyTimeStamp<Tuple2<String, String>>(timestamps2));
+ new MyCoReduceFunction(), 4L, 3L, 2L, 2L,
keySelector0, keySelector1,
+ new MyTimeStamp<Tuple2<String,
Integer>>(timestamps1),
+ new MyTimeStamp<Tuple2<String,
String>>(timestamps2));
List<String> result = MockCoContext.createAndExecute(invokable,
inputs1, inputs2);
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
index ceaccf3..ce47c67 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -23,8 +23,8 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.keys.ObjectKeySelector;
import org.junit.Test;
public class GroupedReduceInvokableTest {
@@ -43,7 +43,15 @@ public class GroupedReduceInvokableTest {
@Test
public void test() {
GroupedReduceInvokable<Integer> invokable1 = new
GroupedReduceInvokable<Integer>(
- new MyReducer(), new
ObjectKeySelector<Integer>());
+ new MyReducer(), new KeySelector<Integer,
Integer>() {
+
+ private static final long
serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value)
throws Exception {
+ return value;
+ }
+ });
List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
List<Integer> actual = MockContext.createAndExecute(invokable1,
@@ -51,5 +59,4 @@ public class GroupedReduceInvokableTest {
assertEquals(expected, actual);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index c3a48d5..d97cadc 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -40,11 +40,20 @@ import
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class GroupedWindowInvokableTest {
+ KeySelector<Tuple2<Integer, String>, ?> keySelector = new
KeySelector<Tuple2<Integer, String>, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<Integer, String> value) throws
Exception {
+ return value.f1;
+ }
+ };
+
/**
* Tests that illegal arguments result in failure. The following cases
are
* tested: 1) having no trigger 2) having no eviction 3) having neither
@@ -162,7 +171,7 @@ public class GroupedWindowInvokableTest {
expectedDistributedEviction.add(3);
expectedDistributedEviction.add(3);
expectedDistributedEviction.add(15);
-
+
List<Integer> expectedCentralEviction = new
ArrayList<Integer>();
expectedCentralEviction.add(2);
expectedCentralEviction.add(5);
@@ -173,7 +182,7 @@ public class GroupedWindowInvokableTest {
expectedCentralEviction.add(5);
expectedCentralEviction.add(1);
expectedCentralEviction.add(5);
-
+
LinkedList<CloneableTriggerPolicy<Integer>> triggers = new
LinkedList<CloneableTriggerPolicy<Integer>>();
// Trigger on every 2nd element, but the first time after the
3rd
triggers.add(new CountTriggerPolicy<Integer>(2, -1));
@@ -185,7 +194,7 @@ public class GroupedWindowInvokableTest {
LinkedList<TriggerPolicy<Integer>> centralTriggers = new
LinkedList<TriggerPolicy<Integer>>();
- ReduceFunction<Integer> reduceFunction=new
ReduceFunction<Integer>() {
+ ReduceFunction<Integer> reduceFunction = new
ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
@@ -193,8 +202,8 @@ public class GroupedWindowInvokableTest {
return value1 + value2;
}
};
-
- KeySelector<Integer, Integer> keySelector=new
KeySelector<Integer, Integer>() {
+
+ KeySelector<Integer, Integer> keySelector = new
KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
@@ -202,7 +211,7 @@ public class GroupedWindowInvokableTest {
return value;
}
};
-
+
GroupedWindowInvokable<Integer, Integer> invokable = new
GroupedWindowInvokable<Integer, Integer>(
reduceFunction, keySelector, triggers,
evictions, centralTriggers, null);
@@ -213,18 +222,19 @@ public class GroupedWindowInvokableTest {
actual.add(current);
}
- assertEquals(new HashSet<Integer>(expectedDistributedEviction),
new HashSet<Integer>(actual));
+ assertEquals(new HashSet<Integer>(expectedDistributedEviction),
+ new HashSet<Integer>(actual));
assertEquals(expectedDistributedEviction.size(), actual.size());
-
- //Run test with central eviction
+
+ // Run test with central eviction
triggers.clear();
centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1));
LinkedList<EvictionPolicy<Integer>> centralEvictions = new
LinkedList<EvictionPolicy<Integer>>();
centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2,
-1));
-
- invokable = new GroupedWindowInvokable<Integer, Integer>(
- reduceFunction, keySelector, triggers, null,
centralTriggers,centralEvictions);
-
+
+ invokable = new GroupedWindowInvokable<Integer,
Integer>(reduceFunction, keySelector,
+ triggers, null, centralTriggers,
centralEvictions);
+
result = MockContext.createAndExecute(invokable, inputs);
actual = new LinkedList<Integer>();
for (Integer current : result) {
@@ -279,8 +289,7 @@ public class GroupedWindowInvokableTest {
return value2;
}
}
- }, new TupleKeySelector<Tuple2<Integer,
String>>(1), triggers, evictions,
- centralTriggers, null);
+ }, keySelector, triggers, evictions,
centralTriggers, null);
List<Tuple2<Integer, String>> result =
MockContext.createAndExecute(invokable2, inputs2);
@@ -387,8 +396,7 @@ public class GroupedWindowInvokableTest {
LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>
distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer,
String>>>();
GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer,
String>> invokable = new GroupedWindowInvokable<Tuple2<Integer, String>,
Tuple2<Integer, String>>(
- myReduceFunction, new
TupleKeySelector<Tuple2<Integer, String>>(1),
- distributedTriggers, evictions, triggers, null);
+ myReduceFunction, keySelector,
distributedTriggers, evictions, triggers, null);
ArrayList<Tuple2<Integer, String>> result = new
ArrayList<Tuple2<Integer, String>>();
for (Tuple2<Integer, String> t :
MockContext.createAndExecute(invokable, inputs)) {
@@ -398,7 +406,7 @@ public class GroupedWindowInvokableTest {
assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
new HashSet<Tuple2<Integer, String>>(result));
assertEquals(expected.size(), result.size());
-
+
// repeat the test with central eviction. The result should be
the same.
triggers.clear();
triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L,
myTimeStamp, 2L));
@@ -407,8 +415,8 @@ public class GroupedWindowInvokableTest {
centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer,
String>>(4L, myTimeStamp));
invokable = new GroupedWindowInvokable<Tuple2<Integer, String>,
Tuple2<Integer, String>>(
- myReduceFunction, new
TupleKeySelector<Tuple2<Integer, String>>(1),
- distributedTriggers, evictions, triggers,
centralEvictions);
+ myReduceFunction, keySelector,
distributedTriggers, evictions, triggers,
+ centralEvictions);
result = new ArrayList<Tuple2<Integer, String>>();
for (Tuple2<Integer, String> t :
MockContext.createAndExecute(invokable, inputs)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
index 18b3015..b56649b 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
@@ -20,11 +20,11 @@ package org.apache.flink.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +42,15 @@ public class FieldsPartitionerTest {
@Before
public void setPartitioner() {
- fieldsPartitioner = new FieldsPartitioner<Tuple>(new
TupleKeySelector<Tuple>(0));
+ fieldsPartitioner = new FieldsPartitioner<Tuple>(new
KeySelector<Tuple, String>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple value) throws Exception {
+ return value.getField(0);
+ }
+ });
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
deleted file mode 100644
index 98b60b5..0000000
---
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
-import org.apache.flink.streaming.util.keys.ObjectKeySelector;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.junit.Test;
-
-public class FieldsKeySelectorTest {
-
- @Test
- public void testGetKey() throws Exception {
-
- Integer i = 5;
- Tuple2<Integer, String> t = new Tuple2<Integer, String>(-1,
"a");
- double[] a = new double[] { 0.0, 1.2 };
-
- KeySelector<Integer, ?> ks1 = new ObjectKeySelector<Integer>();
-
- assertEquals(ks1.getKey(i), 5);
-
- KeySelector<Tuple2<Integer, String>, ?> ks3 = new
TupleKeySelector<Tuple2<Integer, String>>(
- 1);
- assertEquals(ks3.getKey(t), "a");
-
- KeySelector<Tuple2<Integer, String>, ?> ks4 =
FieldsKeySelector.getSelector(
- TypeExtractor.getForObject(t), 1, 1);
- assertEquals(ks4.getKey(t), new Tuple2<String, String>("a",
"a"));
-
- KeySelector<double[], ?> ks5 = FieldsKeySelector.getSelector(
- TypeExtractor.getForObject(a), 0);
- assertEquals(ks5.getKey(a), 0.0);
-
- KeySelector<double[], ?> ks6 = FieldsKeySelector.getSelector(
- TypeExtractor.getForObject(a), 1, 0);
- assertEquals(ks6.getKey(a), new Tuple2<Double, Double>(1.2,
0.0));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
index eea76c1..caf5eb9 100644
---
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
+++
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/WindowJoin.scala
@@ -20,7 +20,6 @@ package org.apache.flink.examples.scala.streaming.windowing
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.function.source.SourceFunction
import org.apache.flink.util.Collector
import scala.util.Random
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/CaseClassKeySelector.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/CaseClassKeySelector.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/CaseClassKeySelector.scala
deleted file mode 100644
index 63410a9..0000000
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/CaseClassKeySelector.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import java.util.ArrayList
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
-import org.apache.flink.api.java.functions.KeySelector
-
-class CaseClassKeySelector[T <: Product](@transient val typeInfo:
CaseClassTypeInfo[T],
- val keyFields: String*) extends KeySelector[T, Seq[Any]] {
-
- val numOfKeys: Int = keyFields.length;
-
- @transient val fieldDescriptors = new ArrayList[FlatFieldDescriptor]();
- for (field <- keyFields) {
- typeInfo.getKey(field, 0, fieldDescriptors);
- }
-
- val logicalKeyPositions = new Array[Int](numOfKeys)
- val orders = new Array[Boolean](numOfKeys)
-
- for (i <- 0 to numOfKeys - 1) {
- logicalKeyPositions(i) = fieldDescriptors.get(i).getPosition();
- }
-
- def getKey(value: T): Seq[Any] = {
- for (i <- 0 to numOfKeys - 1) yield
value.productElement(logicalKeyPositions(i))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 6df4b25..38ad384 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -116,7 +116,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def groupBy(fields: Int*): DataStream[T] =
- new DataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
+ new DataStream[T](javaStream.groupBy(fields: _*))
/**
* Groups the elements of a DataStream by the given field expressions to
@@ -124,12 +124,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def groupBy(firstField: String, otherFields: String*): DataStream[T] =
- javaStream.getType() match {
- case ccInfo: CaseClassTypeInfo[T] => new
DataStream[T](javaStream.groupBy(
- new CaseClassKeySelector[T](ccInfo, firstField +:
otherFields.toArray: _*)))
- case _ => new DataStream[T](javaStream.groupBy(
- firstField +: otherFields.toArray: _*))
- }
+ new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray:
_*))
/**
* Groups the elements of a DataStream by the given K key to
@@ -152,7 +147,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def partitionBy(fields: Int*): DataStream[T] =
- new DataStream[T](javaStream.partitionBy(new FieldsKeySelector[T](fields:
_*)))
+ new DataStream[T](javaStream.partitionBy(fields: _*));
/**
* Sets the partitioning of the DataStream so that the output is
@@ -161,12 +156,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
- javaStream.getType() match {
- case ccInfo: CaseClassTypeInfo[T] => new
DataStream[T](javaStream.partitionBy(
- new CaseClassKeySelector[T](ccInfo, firstField +:
otherFields.toArray: _*)))
- case _ => new DataStream[T](javaStream.partitionBy(
- firstField +: otherFields.toArray: _*))
- }
+ new DataStream[T](javaStream.partitionBy(firstField +:
otherFields.toArray: _*))
/**
* Sets the partitioning of the DataStream so that the output is
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
deleted file mode 100644
index bc79fca..0000000
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.streaming
-
-import org.apache.flink.streaming.util.keys.{ FieldsKeySelector =>
JavaSelector }
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.tuple.Tuple
-
-class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Seq[Any]] {
-
- override def getKey(value: IN): Seq[Any] =
-
- value match {
- case prod: Product =>
- for (i <- 0 to fields.length - 1) yield prod.productElement(fields(i))
- case tuple: Tuple =>
- for (i <- 0 to fields.length - 1) yield tuple.getField(fields(i))
-
- case _ => throw new RuntimeException("Only tuple types are supported")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
index 5f579e5..72052b9 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -23,17 +23,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.TemporalOperator
-import org.apache.flink.streaming.api.function.co.JoinWindowFunction
-import org.apache.flink.streaming.util.keys.PojoKeySelector
import scala.reflect.ClassTag
import org.apache.commons.lang.Validate
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
import org.apache.flink.streaming.api.function.co.CrossWindowFunction
import org.apache.flink.api.common.functions.CrossFunction
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+
class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2])
extends
TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index 4095645..f47d79e 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -28,10 +28,11 @@ import
org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.TemporalOperator
import org.apache.flink.streaming.api.function.co.JoinWindowFunction
-import org.apache.flink.streaming.util.keys.PojoKeySelector
import scala.reflect.ClassTag
import org.apache.commons.lang.Validate
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.util.keys.KeySelectorUtil
+import org.apache.flink.api.java.operators.Keys
class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2])
extends
TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
@@ -43,8 +44,10 @@ TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1,
I2]](i1, i2) {
object StreamJoinOperator {
- class JoinWindow[I1, I2](op: StreamJoinOperator[I1, I2]) {
+ class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) {
+ private[flink] val type1 = op.input1.getType();
+
/**
* Continues a temporal Join transformation by defining
* the fields in the first stream to be used as keys for the join.
@@ -52,7 +55,8 @@ object StreamJoinOperator {
* to define the second key.
*/
def where(fields: Int*) = {
- new JoinPredicate[I1, I2](op, new FieldsKeySelector[I1](fields: _*))
+ new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(fields.toArray,type1),type1))
}
/**
@@ -62,12 +66,8 @@ object StreamJoinOperator {
* to define the second key.
*/
def where(firstField: String, otherFields: String*) =
- op.input1.getType() match {
- case ccInfo: CaseClassTypeInfo[I1] => new JoinPredicate[I1, I2](op,
- new CaseClassKeySelector[I1](ccInfo, firstField +:
otherFields.toArray: _*))
- case _ => new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](
- op.input1.getType(), (firstField +: otherFields): _*))
- }
+ new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(firstField +:
otherFields.toArray,type1),type1))
/**
* Continues a temporal Join transformation by defining
@@ -90,6 +90,7 @@ object StreamJoinOperator {
class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1,
I2],
private[flink] val keys1: KeySelector[I1, _]) {
private[flink] var keys2: KeySelector[I2, _] = null
+ private[flink] val type2 = op.input2.getType();
/**
* Creates a temporal join transformation by defining the second join key.
@@ -98,7 +99,8 @@ object StreamJoinOperator {
* To define a custom wrapping, use JoinedStream.apply(...)
*/
def equalTo(fields: Int*): JoinedStream[I1, I2] = {
- finish(new FieldsKeySelector[I2](fields: _*))
+ finish(KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(fields.toArray,type2),type2))
}
/**
@@ -108,12 +110,8 @@ object StreamJoinOperator {
* To define a custom wrapping, use JoinedStream.apply(...)
*/
def equalTo(firstField: String, otherFields: String*): JoinedStream[I1,
I2] =
- op.input2.getType() match {
- case ccInfo: CaseClassTypeInfo[I2] => finish(
- new CaseClassKeySelector[I2](ccInfo, firstField +:
otherFields.toArray: _*))
- case _ => finish(new PojoKeySelector[I2](op.input2.getType(),
- (firstField +: otherFields): _*))
- }
+ finish(KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys(firstField +:
otherFields.toArray,type2),type2))
/**
* Creates a temporal join transformation by defining the second join key.
http://git-wip-us.apache.org/repos/asf/flink/blob/59952489/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index 11f042d..5346c4c 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -66,7 +66,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
*
*/
def groupBy(fields: Int*): WindowedDataStream[T] =
- new WindowedDataStream[T](javaStream.groupBy(new
FieldsKeySelector[T](fields: _*)))
+ new WindowedDataStream[T](javaStream.groupBy(fields: _*))
/**
* Groups the elements of the WindowedDataStream using the given
@@ -78,13 +78,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
*
*/
def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T]
=
- javaStream.getType() match {
- case ccInfo: CaseClassTypeInfo[T] => new
WindowedDataStream[T](javaStream.groupBy(
- new CaseClassKeySelector[T](ccInfo, firstField +:
otherFields.toArray: _*)))
- case _ => new WindowedDataStream[T](javaStream.groupBy(
+ new WindowedDataStream[T](javaStream.groupBy(
firstField +: otherFields.toArray: _*))
- }
-
/**
* Groups the elements of the WindowedDataStream using the given