[streaming] Temporal operator windowing syntax update
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b0a2e4a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b0a2e4a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b0a2e4a1 Branch: refs/heads/master Commit: b0a2e4a14e0161e753ec3296d4ead8e46ac8e303 Parents: 92ceacd Author: Gyula Fora <[email protected]> Authored: Mon Jan 5 15:24:59 2015 +0100 Committer: mbalassi <[email protected]> Committed: Tue Jan 6 15:09:04 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 4 + .../streaming/api/datastream/DataStream.java | 4 +- .../api/datastream/DataStreamSink.java | 2 +- .../api/datastream/GroupedDataStream.java | 2 +- .../api/datastream/IterativeDataStream.java | 2 +- .../datastream/SingleOutputStreamOperator.java | 2 +- .../api/datastream/StreamCrossOperator.java | 86 ------ .../api/datastream/StreamJoinOperator.java | 257 ------------------ .../api/datastream/TemporalOperator.java | 102 ------- .../temporaloperator/StreamCrossOperator.java | 101 +++++++ .../temporaloperator/StreamJoinOperator.java | 272 +++++++++++++++++++ .../temporaloperator/TemporalOperator.java | 124 +++++++++ .../temporaloperator/TemporalWindow.java | 45 +++ .../operator/co/CoWindowInvokable.java | 4 + .../streaming/api/windowing/helper/Time.java | 20 +- .../streaming/api/WindowCrossJoinTest.java | 21 +- .../streaming/examples/join/WindowJoin.java | 3 +- .../scala/examples/windowing/WindowJoin.scala | 7 +- .../api/scala/StreamCrossOperator.scala | 17 +- .../api/scala/StreamJoinOperator.scala | 47 ++-- .../streaming/api/scala/TemporalOperator.scala | 43 +++ 21 files changed, 662 insertions(+), 503 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index c826274..441360c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -509,6 +509,10 @@ public class JobGraphBuilder { invokableObjects.put(id, invokableObject); } + public StreamInvokable<?, ?> getInvokable(String id) { + return invokableObjects.get(id); + } + public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); typeSerializersOut1.put(id, serializer); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8d0020e..e969647 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -38,6 +38,8 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.JobGraphBuilder; +import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator; +import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; @@ -1180,7 +1182,7 @@ public class DataStream<OUT> { * * @return The copy */ - protected DataStream<OUT> copy() { + public DataStream<OUT> copy() { return new DataStream<OUT>(this); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 369c3eb..61fc557 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -38,7 +38,7 @@ public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStrea } @Override - protected DataStreamSink<IN> copy() { + public DataStreamSink<IN> copy() { throw new RuntimeException("Data stream sinks cannot be copied"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 160ef8d..4a5c0c2 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -199,7 +199,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { } @Override - protected GroupedDataStream<OUT> copy() { + public GroupedDataStream<OUT> copy() { return new GroupedDataStream<OUT>(this); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index 78518c0..d7467d1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -72,7 +72,7 @@ public class IterativeDataStream<IN> extends } @Override - protected IterativeDataStream<IN> copy() { + public IterativeDataStream<IN> copy() { return new IterativeDataStream<IN>(this, iterationID, waitTime); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index c19517b..4b6edc0 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -186,7 +186,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato } @Override - protected SingleOutputStreamOperator<OUT, O> copy() { + public SingleOutputStreamOperator<OUT, O> copy() { return new SingleOutputStreamOperator<OUT, O>(this); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java deleted file mode 100644 index 2dd7bc0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.CrossOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.function.co.CrossWindowFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; - -public class StreamCrossOperator<I1, I2> extends - TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> { - - public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) { - super(input1, input2); - } - - @Override - protected CrossWindow<I1, I2> createNextWindowOperator() { - - CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>( - input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>())); - - return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine( - crossWindowFunction, - new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize, - slideInterval, timeStamp1, timeStamp2)); - } - - public static class CrossWindow<I1, I2> extends - SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> { - - private StreamCrossOperator<I1, I2> op; - - public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) { - super(ds); - this.op = op; - } - - /** - * Finalizes a temporal Cross transformation by applying a - * {@link CrossFunction} to each pair of crossed elements.<br/> - * Each CrossFunction call returns exactly one element. - * - * @param function - * The CrossFunction that is called for each pair of crossed - * elements. - * @return The crossed data streams - * - */ - public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) { - TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function, - op.input1.getType(), op.input2.getType()); - - CoWindowInvokable<I1, I2, R> invokable = new CoWindowInvokable<I1, I2, R>( - new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize, - op.slideInterval, op.timeStamp1, op.timeStamp2); - - jobGraphBuilder.setInvokable(id, invokable); - - return setType(outTypeInfo); - - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java deleted file mode 100644 index de15515..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.Keys; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.function.co.JoinWindowFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.util.keys.KeySelectorUtil; - -public class StreamJoinOperator<I1, I2> extends - TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> { - - public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) { - super(input1, input2); - } - - @Override - protected JoinWindow<I1, I2> createNextWindowOperator() { - return new JoinWindow<I1, I2>(this); - } - - public static class JoinWindow<I1, I2> { - - private StreamJoinOperator<I1, I2> op; - private TypeInformation<I1> type1; - - private JoinWindow(StreamJoinOperator<I1, I2> operator) { - this.op = operator; - this.type1 = op.input1.getType(); - } - - /** - * Continues a temporal Join transformation. <br/> - * Defines the {@link Tuple} fields of the first join {@link DataStream} - * that should be used as join keys.<br/> - * <b>Note: Fields can only be selected as join keys on Tuple - * DataStreams.</b><br/> - * - * @param fields - * The indexes of the other Tuple fields of the first join - * DataStreams that should be used as keys. - * @return An incomplete Join transformation. Call - * {@link JoinPredicate#equalTo} to continue the Join. - */ - public JoinPredicate<I1, I2> where(int... fields) { - return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<I1>(fields, type1), type1)); - } - - /** - * Continues a temporal join transformation. <br/> - * Defines the fields of the first join {@link DataStream} that should - * be used as grouping keys. Fields are the names of member fields of - * the underlying type of the data stream. - * - * @param fields - * The fields of the first join DataStream that should be - * used as keys. - * @return An incomplete Join transformation. Call - * {@link JoinPredicate#equalTo} to continue the Join. - */ - public JoinPredicate<I1, I2> where(String... fields) { - return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<I1>(fields, type1), type1)); - } - - /** - * Continues a temporal Join transformation and defines a - * {@link KeySelector} function for the first join {@link DataStream} - * .</br> The KeySelector function is called for each element of the - * first DataStream and extracts a single key value on which the - * DataStream is joined. </br> - * - * @param keySelector - * The KeySelector function which extracts the key values - * from the DataStream on which it is joined. - * @return An incomplete Join transformation. Call - * {@link JoinPredicate#equalTo} to continue the Join. - */ - public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) { - return new JoinPredicate<I1, I2>(op, keySelector); - } - - // ---------------------------------------------------------------------------------------- - - } - - /** - * Intermediate step of a temporal Join transformation. <br/> - * To continue the Join transformation, select the join key of the second - * input {@link DataStream} by calling {@link JoinPredicate#equalTo} - * - */ - public static class JoinPredicate<I1, I2> { - - private StreamJoinOperator<I1, I2> op; - private KeySelector<I1, ?> keys1; - private KeySelector<I2, ?> keys2; - private TypeInformation<I2> type2; - - private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) { - this.op = operator; - this.keys1 = keys1; - this.type2 = op.input2.getType(); - } - - /** - * Creates a temporal Join transformation and defines the {@link Tuple} - * fields of the second join {@link DataStream} that should be used as - * join keys.<br/> - * </p> The resulting operator wraps each pair of joining elements in a - * Tuple2<I1,I2>(first, second). To use a different wrapping function - * use {@link JoinedStream#with(JoinFunction)} - * - * @param fields - * The indexes of the Tuple fields of the second join - * DataStream that should be used as keys. - * @return A streaming join operator. Call {@link JoinedStream#with} to - * apply a custom wrapping - */ - public JoinedStream<I1, I2> equalTo(int... fields) { - keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2), - type2); - return createJoinOperator(); - } - - /** - * Creates a temporal Join transformation and defines the fields of the - * second join {@link DataStream} that should be used as join keys. </p> - * The resulting operator wraps each pair of joining elements in a - * Tuple2<I1,I2>(first, second). To use a different wrapping function - * use {@link JoinedStream#with(JoinFunction)} - * - * @param fields - * The fields of the second join DataStream that should be - * used as keys. - * @return A streaming join operator. Call {@link JoinedStream#with} to - * apply a custom wrapping - */ - public JoinedStream<I1, I2> equalTo(String... fields) { - this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, - type2), type2); - return createJoinOperator(); - } - - /** - * Creates a temporal Join transformation and defines a - * {@link KeySelector} function for the second join {@link DataStream} - * .</br> The KeySelector function is called for each element of the - * second DataStream and extracts a single key value on which the - * DataStream is joined. </p> The resulting operator wraps each pair of - * joining elements in a Tuple2<I1,I2>(first, second). To use a - * different wrapping function use - * {@link JoinedStream#with(JoinFunction)} - * - * - * @param keySelector - * The KeySelector function which extracts the key values - * from the second DataStream on which it is joined. - * @return A streaming join operator. Call {@link JoinedStream#with} to - * apply a custom wrapping - */ - public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) { - this.keys2 = keySelector; - return createJoinOperator(); - } - - private JoinedStream<I1, I2> createJoinOperator() { - - JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>(); - - JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction( - joinFunction, this); - - TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>( - op.input1.getType(), op.input2.getType()); - - return new JoinedStream<I1, I2>(this, op.input1 - .groupBy(keys1) - .connect(op.input2.groupBy(keys2)) - .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize, - op.slideInterval, op.timeStamp1, op.timeStamp2)); - } - } - - public static class JoinedStream<I1, I2> extends - SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> { - private final JoinPredicate<I1, I2> predicate; - - private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) { - super(ds); - this.predicate = predicate; - } - - /** - * Completes a stream join. </p> The resulting operator wraps each pair - * of joining elements using the user defined {@link JoinFunction} - * - * @return The joined data stream. - */ - public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) { - - TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction, - predicate.op.input1.getType(), predicate.op.input2.getType()); - - CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>( - getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize, - predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2); - - jobGraphBuilder.setInvokable(id, invokable); - - return setType(outType); - } - } - - public static final class DefaultJoinFunction<I1, I2> implements - JoinFunction<I1, I2, Tuple2<I1, I2>> { - - private static final long serialVersionUID = 1L; - private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>(); - - @Override - public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception { - outTuple.f0 = first; - outTuple.f1 = second; - return outTuple; - } - } - - public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction( - JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) { - return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java deleted file mode 100644 index e5385f0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; - -public abstract class TemporalOperator<I1, I2, OP> { - - public final DataStream<I1> input1; - public final DataStream<I2> input2; - - public long windowSize; - public long slideInterval; - - public TimestampWrapper<I1> timeStamp1; - public TimestampWrapper<I2> timeStamp2; - - public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) { - if (input1 == null || input2 == null) { - throw new NullPointerException(); - } - this.input1 = input1.copy(); - this.input2 = input2.copy(); - } - - /** - * Continues a temporal transformation.<br/> - * Defines the window size on which the two DataStreams will be transformed. - * - * @param windowSize - * The size of the window in milliseconds. - * @return An incomplete temporal transformation. - */ - public OP onWindow(long windowSize) { - return onWindow(windowSize, windowSize); - } - - /** - * Continues a temporal transformation.<br/> - * Defines the window size on which the two DataStreams will be transformed. - * - * @param windowSize - * The size of the window in milliseconds. - * @param slideInterval - * The slide size of the window. - * @return An incomplete temporal transformation. - */ - @SuppressWarnings("unchecked") - public OP onWindow(long windowSize, long slideInterval) { - return onWindow(windowSize, slideInterval, - (TimestampWrapper<I1>) SystemTimestamp.getWrapper(), - (TimestampWrapper<I2>) SystemTimestamp.getWrapper()); - } - - /** - * Continues a temporal transformation.<br/> - * Defines the window size on which the two DataStreams will be transformed. - * - * @param windowSize - * The size of the window in milliseconds. - * @param slideInterval - * The slide size of the window. - * @param timeStamp1 - * The timestamp used to extract time from the elements of the - * first data stream. - * @param timeStamp2 - * The timestamp used to extract time from the elements of the - * second data stream. - * @return An incomplete temporal transformation. - */ - public OP onWindow(long windowSize, long slideInterval, TimestampWrapper<I1> timeStamp1, - TimestampWrapper<I2> timeStamp2) { - - this.windowSize = windowSize; - this.slideInterval = slideInterval; - - this.timeStamp1 = timeStamp1; - this.timeStamp2 = timeStamp2; - - return createNextWindowOperator(); - } - - protected abstract OP createNextWindowOperator(); - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java new file mode 100644 index 0000000..8422400 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream.temporaloperator; + +import java.util.concurrent.TimeUnit; + +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.CrossOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.function.co.CrossWindowFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; + +public class StreamCrossOperator<I1, I2> extends + TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> { + + public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) { + super(input1, input2); + } + + @Override + protected CrossWindow<I1, I2> createNextWindowOperator() { + + CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>( + input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>())); + + return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine( + crossWindowFunction, + new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize, + slideInterval, timeStamp1, timeStamp2)); + } + + public static class CrossWindow<I1, I2> extends + SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> implements + TemporalWindow<CrossWindow<I1, I2>> { + + private StreamCrossOperator<I1, I2> op; + + public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) { + super(ds); + this.op = op; + } + + public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) { + return every(timeUnit.toMillis(length)); + } + + @SuppressWarnings("unchecked") + public CrossWindow<I1, I2> every(long length) { + ((CoWindowInvokable<I1, I2, ?>) jobGraphBuilder.getInvokable(id)).setSlideSize(length); + return this; + } + + /** + * Finalizes a temporal Cross transformation by applying a + * {@link CrossFunction} to each pair of crossed elements.<br/> + * Each CrossFunction call returns exactly one element. + * + * @param function + * The CrossFunction that is called for each pair of crossed + * elements. + * @return The crossed data streams + * + */ + public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) { + TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function, + op.input1.getType(), op.input2.getType()); + + CoWindowInvokable<I1, I2, R> invokable = new CoWindowInvokable<I1, I2, R>( + new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize, + op.slideInterval, op.timeStamp1, op.timeStamp2); + + jobGraphBuilder.setInvokable(id, invokable); + + return setType(outTypeInfo); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java new file mode 100644 index 0000000..626b9f1 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream.temporaloperator; + +import java.util.concurrent.TimeUnit; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.function.co.JoinWindowFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; +import org.apache.flink.streaming.util.keys.KeySelectorUtil; + +public class StreamJoinOperator<I1, I2> extends + TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> { + + public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) { + super(input1, input2); + } + + @Override + protected JoinWindow<I1, I2> createNextWindowOperator() { + return new JoinWindow<I1, I2>(this); + } + + public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> { + + private StreamJoinOperator<I1, I2> op; + private TypeInformation<I1> type1; + + private JoinWindow(StreamJoinOperator<I1, I2> operator) { + this.op = operator; + this.type1 = op.input1.getType(); + } + + /** + * Continues a temporal Join transformation. <br/> + * Defines the {@link Tuple} fields of the first join {@link DataStream} + * that should be used as join keys.<br/> + * <b>Note: Fields can only be selected as join keys on Tuple + * DataStreams.</b><br/> + * + * @param fields + * The indexes of the other Tuple fields of the first join + * DataStreams that should be used as keys. + * @return An incomplete Join transformation. Call + * {@link JoinPredicate#equalTo} to continue the Join. + */ + public JoinPredicate<I1, I2> where(int... fields) { + return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys<I1>(fields, type1), type1)); + } + + /** + * Continues a temporal join transformation. <br/> + * Defines the fields of the first join {@link DataStream} that should + * be used as grouping keys. Fields are the names of member fields of + * the underlying type of the data stream. + * + * @param fields + * The fields of the first join DataStream that should be + * used as keys. + * @return An incomplete Join transformation. Call + * {@link JoinPredicate#equalTo} to continue the Join. + */ + public JoinPredicate<I1, I2> where(String... fields) { + return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys<I1>(fields, type1), type1)); + } + + /** + * Continues a temporal Join transformation and defines a + * {@link KeySelector} function for the first join {@link DataStream} + * .</br> The KeySelector function is called for each element of the + * first DataStream and extracts a single key value on which the + * DataStream is joined. </br> + * + * @param keySelector + * The KeySelector function which extracts the key values + * from the DataStream on which it is joined. + * @return An incomplete Join transformation. Call + * {@link JoinPredicate#equalTo} to continue the Join. + */ + public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) { + return new JoinPredicate<I1, I2>(op, keySelector); + } + + @Override + public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) { + return every(timeUnit.toMillis(length)); + } + + @Override + public JoinWindow<I1, I2> every(long length) { + op.slideInterval = length; + return this; + } + + // ---------------------------------------------------------------------------------------- + + } + + /** + * Intermediate step of a temporal Join transformation. <br/> + * To continue the Join transformation, select the join key of the second + * input {@link DataStream} by calling {@link JoinPredicate#equalTo} + * + */ + public static class JoinPredicate<I1, I2> { + + private StreamJoinOperator<I1, I2> op; + private KeySelector<I1, ?> keys1; + private KeySelector<I2, ?> keys2; + private TypeInformation<I2> type2; + + private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) { + this.op = operator; + this.keys1 = keys1; + this.type2 = op.input2.getType(); + } + + /** + * Creates a temporal Join transformation and defines the {@link Tuple} + * fields of the second join {@link DataStream} that should be used as + * join keys.<br/> + * </p> The resulting operator wraps each pair of joining elements in a + * Tuple2<I1,I2>(first, second). To use a different wrapping function + * use {@link JoinedStream#with(JoinFunction)} + * + * @param fields + * The indexes of the Tuple fields of the second join + * DataStream that should be used as keys. + * @return A streaming join operator. Call {@link JoinedStream#with} to + * apply a custom wrapping + */ + public JoinedStream<I1, I2> equalTo(int... fields) { + keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2), + type2); + return createJoinOperator(); + } + + /** + * Creates a temporal Join transformation and defines the fields of the + * second join {@link DataStream} that should be used as join keys. </p> + * The resulting operator wraps each pair of joining elements in a + * Tuple2<I1,I2>(first, second). To use a different wrapping function + * use {@link JoinedStream#with(JoinFunction)} + * + * @param fields + * The fields of the second join DataStream that should be + * used as keys. + * @return A streaming join operator. Call {@link JoinedStream#with} to + * apply a custom wrapping + */ + public JoinedStream<I1, I2> equalTo(String... fields) { + this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, + type2), type2); + return createJoinOperator(); + } + + /** + * Creates a temporal Join transformation and defines a + * {@link KeySelector} function for the second join {@link DataStream} + * .</br> The KeySelector function is called for each element of the + * second DataStream and extracts a single key value on which the + * DataStream is joined. </p> The resulting operator wraps each pair of + * joining elements in a Tuple2<I1,I2>(first, second). To use a + * different wrapping function use + * {@link JoinedStream#with(JoinFunction)} + * + * + * @param keySelector + * The KeySelector function which extracts the key values + * from the second DataStream on which it is joined. + * @return A streaming join operator. Call {@link JoinedStream#with} to + * apply a custom wrapping + */ + public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) { + this.keys2 = keySelector; + return createJoinOperator(); + } + + private JoinedStream<I1, I2> createJoinOperator() { + + JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>(); + + JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction( + joinFunction, this); + + TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>( + op.input1.getType(), op.input2.getType()); + + return new JoinedStream<I1, I2>(this, op.input1 + .groupBy(keys1) + .connect(op.input2.groupBy(keys2)) + .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize, + op.slideInterval, op.timeStamp1, op.timeStamp2)); + } + } + + public static class JoinedStream<I1, I2> extends + SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> { + private final JoinPredicate<I1, I2> predicate; + + private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) { + super(ds); + this.predicate = predicate; + } + + /** + * Completes a stream join. </p> The resulting operator wraps each pair + * of joining elements using the user defined {@link JoinFunction} + * + * @return The joined data stream. + */ + public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) { + + TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction, + predicate.op.input1.getType(), predicate.op.input2.getType()); + + CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>( + getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize, + predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2); + + jobGraphBuilder.setInvokable(id, invokable); + + return setType(outType); + } + } + + public static final class DefaultJoinFunction<I1, I2> implements + JoinFunction<I1, I2, Tuple2<I1, I2>> { + + private static final long serialVersionUID = 1L; + private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>(); + + @Override + public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception { + outTuple.f0 = first; + outTuple.f1 = second; + return outTuple; + } + } + + public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction( + JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) { + return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java new file mode 100644 index 0000000..f121dfa --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream.temporaloperator; + +import java.util.concurrent.TimeUnit; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; + +public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> { + + public final DataStream<I1> input1; + public final DataStream<I2> input2; + + public long windowSize; + public long slideInterval; + + public TimestampWrapper<I1> timeStamp1; + public TimestampWrapper<I2> timeStamp2; + + public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) { + if (input1 == null || input2 == null) { + throw new NullPointerException(); + } + this.input1 = input1.copy(); + this.input2 = input2.copy(); + } + + /** + * Continues a temporal transformation.<br/> + * Defines the window size on which the two DataStreams will be transformed. + * To define sliding windows call {@link TemporalWindow#every} on the + * resulting operator. + * + * @param length + * The size of the window in milliseconds. + * @param timeUnit + * The unit if time to be used + * @return An incomplete temporal transformation. + */ + @SuppressWarnings("unchecked") + public OP onWindow(long length, TimeUnit timeUnit) { + return onWindow(timeUnit.toMillis(length), + (TimestampWrapper<I1>) SystemTimestamp.getWrapper(), + (TimestampWrapper<I2>) SystemTimestamp.getWrapper()); + } + + /** + * Continues a temporal transformation.<br/> + * Defines the window size on which the two DataStreams will be + * transformed.To define sliding windows call {@link TemporalWindow#every} + * on the resulting operator. + * + * @param windowSize + * The size of the window in milliseconds. + * @param timeStamp1 + * The timestamp used to extract time from the elements of the + * first data stream. + * @param timeStamp2 + * The timestamp used to extract time from the elements of the + * second data stream. + * @return An incomplete temporal transformation. + */ + public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) { + return onWindow(length, timeStamp1, timeStamp2, 0); + } + + /** + * Continues a temporal transformation.<br/> + * Defines the window size on which the two DataStreams will be + * transformed.To define sliding windows call {@link TemporalWindow#every} + * on the resulting operator. + * + * @param windowSize + * The size of the window in milliseconds. + * @param timeStamp1 + * The timestamp used to extract time from the elements of the + * first data stream. + * @param timeStamp2 + * The timestamp used to extract time from the elements of the + * second data stream. + * @param startTime + * The start time to measure the first window + * @return An incomplete temporal transformation. + */ + public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2, + long startTime) { + return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime), + new TimestampWrapper<I2>(timeStamp2, startTime)); + } + + private OP onWindow(long length, TimestampWrapper<I1> timeStamp1, + TimestampWrapper<I2> timeStamp2) { + + this.windowSize = length; + this.slideInterval = length; + + this.timeStamp1 = timeStamp1; + this.timeStamp2 = timeStamp2; + + return createNextWindowOperator(); + } + + protected abstract OP createNextWindowOperator(); + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java new file mode 100644 index 0000000..8ac1492 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream.temporaloperator; + +import java.util.concurrent.TimeUnit; + +public interface TemporalWindow<T> { + + /** + * Defines the slide interval for this temporal operator + * + * @param length + * Length of the window + * @param timeUnit + * Unit of time + * @return The temporal operator with slide interval specified + */ + public T every(long length, TimeUnit timeUnit); + + /** + * Defines the slide interval for this temporal operator + * + * @param length + * Length of the window + * @return The temporal operator with slide interval specified + */ + public T every(long length); + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java index 59552f4..03219b7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java @@ -190,4 +190,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> protected void callUserFunction2() throws Exception { } + public void setSlideSize(long slideSize) { + this.slideSize = slideSize; + } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java index 9dc1c8c..f94eea4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java @@ -35,13 +35,13 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; */ public class Time<DATA> implements WindowingHelper<DATA> { - private long length; - private TimeUnit granularity; - private TimestampWrapper<DATA> timestampWrapper; - private long delay; + protected long length; + protected TimeUnit granularity; + protected TimestampWrapper<DATA> timestampWrapper; + protected long delay; /** - * Creates an helper representing a trigger which triggers every given + * Creates a helper representing a trigger which triggers every given * length or an eviction which evicts all elements older than length. * * @param length @@ -62,7 +62,7 @@ public class Time<DATA> implements WindowingHelper<DATA> { } /** - * Creates an helper representing a trigger which triggers every given + * Creates a helper representing a trigger which triggers every given * length or an eviction which evicts all elements older than length. * * @param length @@ -160,11 +160,7 @@ public class Time<DATA> implements WindowingHelper<DATA> { return this; } - private long granularityInMillis() { - if (granularity != null) { - return this.granularity.toMillis(this.length); - } else { - return this.length; - } + protected long granularityInMillis() { + return granularity == null ? length : granularity.toMillis(length); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index e856f07..3da6b5f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.sink.SinkFunction; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; @@ -98,14 +98,14 @@ public class WindowCrossJoinTest implements Serializable { inStream1 .join(inStream2) - .onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(), - new MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0) + .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(), + new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0) .addSink(new JoinResultSink()); inStream1 .cross(inStream2) - .onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(), - new MyTimestamp<Tuple1<Integer>>()) + .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(), + new MyTimestamp<Tuple1<Integer>>(), 100) .with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() { private static final long serialVersionUID = 1L; @@ -123,22 +123,13 @@ public class WindowCrossJoinTest implements Serializable { assertEquals(crossExpectedResults, crossResults); } - private static class MyTimestamp<T> extends TimestampWrapper<T> { - public MyTimestamp() { - super(null, 0); - } - + private static class MyTimestamp<T> implements Timestamp<T> { private static final long serialVersionUID = 1L; @Override public long getTimestamp(T value) { return 101L; } - - @Override - public long getStartTime() { - return 100L; - } } private static class JoinResultSink implements http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 897ad48..dcfed50 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.examples.join; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -67,7 +68,7 @@ public class WindowJoin { // second windows DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades .join(salaries) - .onWindow(1000) + .onWindow(1, TimeUnit.SECONDS) .where(0) .equalTo(0) .with(new MyJoinFunction()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index a19e4b4..e87d4a1 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector import scala.util.Random +import java.util.concurrent.TimeUnit object WindowJoin { @@ -37,10 +38,10 @@ object WindowJoin { val names = env.addSource(nameStream _).map(x => Name(x._1, x._2)) val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2)) - //Join the two input streams by id on the last second and create new Person objects - //containing both name and age + //Join the two input streams by id on the last second every 2 seconds and create new + //Person objects containing both name and age val joined = - names.join(ages).onWindow(1000) + names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS) .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) } joined print http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index e26db62..d620d5e 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.scala import scala.reflect.ClassTag - import org.apache.commons.lang.Validate import org.apache.flink.api.common.functions.CrossFunction import org.apache.flink.api.common.typeinfo.TypeInformation @@ -28,11 +27,12 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} -import org.apache.flink.streaming.api.datastream.TemporalOperator import org.apache.flink.streaming.api.function.co.CrossWindowFunction import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.scala.StreamingConversions._ +import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow +import java.util.concurrent.TimeUnit class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { @@ -72,7 +72,7 @@ object StreamCrossOperator { private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], javaStream: JavaStream[(I1, I2)]) extends - DataStream[(I1, I2)](javaStream) { + DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] { /** * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf @@ -90,6 +90,17 @@ object StreamCrossOperator { javaStream.setType(implicitly[TypeInformation[R]]) } + + override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = { + every(timeUnit.toMillis(length)) + } + + override def every(length: Long): CrossWindow[I1, I2] = { + val builder = javaStream.getExecutionEnvironment().getJobGraphBuilder() + val invokable = builder.getInvokable(javaStream.getId()) + invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length) + this + } } private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index d587d56..cb79e2a 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -20,25 +20,24 @@ package org.apache.flink.streaming.api.scala import scala.Array.canBuildFrom import scala.reflect.ClassTag - import org.apache.commons.lang.Validate import org.apache.flink.api.common.functions.JoinFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.operators.Keys -import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} -import org.apache.flink.streaming.api.datastream.TemporalOperator +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.streaming.api.function.co.JoinWindowFunction import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.scala.StreamingConversions._ +import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream import org.apache.flink.streaming.util.keys.KeySelectorUtil +import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow +import java.util.concurrent.TimeUnit -class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends +class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { override def createNextWindowOperator() = { @@ -48,10 +47,11 @@ TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { object StreamJoinOperator { - class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) { + class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends + TemporalWindow[JoinWindow[I1, I2]] { private[flink] val type1 = op.input1.getType(); - + /** * Continues a temporal Join transformation by defining * the fields in the first stream to be used as keys for the join. @@ -60,7 +60,7 @@ object StreamJoinOperator { */ def where(fields: Int*) = { new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray,type1),type1)) + new Keys.ExpressionKeys(fields.toArray, type1), type1)) } /** @@ -69,9 +69,9 @@ object StreamJoinOperator { * The resulting incomplete join can be completed by JoinPredicate.equalTo() * to define the second key. */ - def where(firstField: String, otherFields: String*) = + def where(firstField: String, otherFields: String*) = new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1)) + new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), type1)) /** * Continues a temporal Join transformation by defining @@ -89,10 +89,19 @@ object StreamJoinOperator { new JoinPredicate[I1, I2](op, keyExtractor) } + override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = { + every(timeUnit.toMillis(length)) + } + + override def every(length: Long): JoinWindow[I1, I2] = { + op.slideInterval = length + this + } + } class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], - private[flink] val keys1: KeySelector[I1, _]) { + private[flink] val keys1: KeySelector[I1, _]) { private[flink] var keys2: KeySelector[I2, _] = null private[flink] val type2 = op.input2.getType(); @@ -104,7 +113,7 @@ object StreamJoinOperator { */ def equalTo(fields: Int*): JoinedStream[I1, I2] = { finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray,type2),type2)) + new Keys.ExpressionKeys(fields.toArray, type2), type2)) } /** @@ -113,9 +122,9 @@ object StreamJoinOperator { * (first, second) * To define a custom wrapping, use JoinedStream.apply(...) */ - def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2)) + def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = + finish(KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), type2)) /** * Creates a temporal join transformation by defining the second join key. @@ -159,11 +168,11 @@ object StreamJoinOperator { return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), - returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) } } - class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends + class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) { private val op = jp.op @@ -186,7 +195,7 @@ object StreamJoinOperator { } private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], - joinFunction: (I1, I2) => R) = { + joinFunction: (I1, I2) => R) = { Validate.notNull(joinFunction, "Join function must not be null.") val joinFun = new JoinFunction[I1, I2, R] { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala new file mode 100644 index 0000000..fd3a4a9 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.api.datastream.temporaloperator.{ TemporalOperator => JTempOp } +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow +import org.apache.flink.streaming.api.windowing.helper.Timestamp +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._ + +abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]]( + i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) { + + def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = { + val timeStamp1 = getTS(ts1) + val timeStamp2 = getTS(ts2) + onWindow(length, timeStamp1, timeStamp2, startTime) + } + + def getTS[R](ts: R => Long): Timestamp[R] = { + new Timestamp[R] { + val cleanFun = clean(ts, true) + def getTimestamp(in: R) = cleanFun(in) + } + } + +}
