[streaming] Temporal join and cross rework for consistence and extended features
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/652327be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/652327be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/652327be Branch: refs/heads/release-0.8 Commit: 652327be3a1196fd98e18a777a24e36e6574da28 Parents: 1922dbd Author: Gyula Fora <[email protected]> Authored: Fri Dec 19 21:05:01 2014 +0100 Committer: mbalassi <[email protected]> Committed: Mon Jan 5 17:54:26 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/ConnectedDataStream.java | 55 +-------- .../api/datastream/StreamCrossOperator.java | 50 ++++++-- .../api/datastream/StreamJoinOperator.java | 123 +++++++++++++------ .../api/datastream/TemporalOperator.java | 103 ++++++++++++++++ .../api/datastream/WindowDBOperator.java | 103 ---------------- .../api/function/co/CrossWindowFunction.java | 21 ++-- .../api/function/co/JoinWindowFunction.java | 17 ++- .../streaming/api/WindowCrossJoinTest.java | 2 +- .../streaming/examples/join/WindowJoin.java | 3 +- .../flink/api/scala/streaming/DataStream.scala | 6 +- 10 files changed, 255 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index c663315..65a6c37 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -17,15 +17,10 @@ package org.apache.flink.streaming.api.datastream; -import java.util.List; - -import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -43,7 +38,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp; -import org.apache.flink.util.Collector; /** * The ConnectedDataStream represents a stream for two different data types. It @@ -545,54 +539,7 @@ public class ConnectedDataStream<IN1, IN2> { return invokable; } - protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCross( - CrossFunction<IN1, IN2, OUT> crossFunction, long windowSize, long slideInterval, - TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) { - - TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CrossFunction.class, - crossFunction.getClass(), 2, null, null); - - CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>( - clean(crossFunction)); - - return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval, - timestamp1, timestamp2); - } - - private static class CrossWindowFunction<IN1, IN2, OUT> implements - CoWindowFunction<IN1, IN2, OUT> { - - private static final long serialVersionUID = 1L; - - private CrossFunction<IN1, IN2, OUT> crossFunction; - - public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) { - this.crossFunction = crossFunction; - } - - @Override - public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) - throws Exception { - for (IN1 firstValue : first) { - for (IN2 secondValue : second) { - out.collect(crossFunction.cross(firstValue, secondValue)); - } - } - } - } - - protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin( - CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize, - long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) { - - TypeInformation<Tuple2<IN1, IN2>> outType = new TupleTypeInfo<Tuple2<IN1, IN2>>( - getInputType1(), getInputType2()); - - return addGeneralWindowCombine(coWindowFunction, outType, windowSize, slideInterval, - timestamp1, timestamp2); - } - - private <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine( + protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine( CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo, long windowSize, long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) { http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java index f54f273..c6cba63 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java @@ -19,9 +19,16 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.CrossOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.function.co.CrossWindowFunction; -public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> { +public class StreamCrossOperator<I1, I2> extends + TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> { public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) { super(input1, input2); @@ -40,12 +47,23 @@ public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, Stream this.op = operator; } + public <F> F clean(F f) { + if (op.input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) { + ClosureCleaner.clean(f, true); + } + ClosureCleaner.ensureSerializable(f); + return f; + } + /** - * Finalizes a temporal Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.<br/> - * Each CrossFunction call returns exactly one element. + * Finalizes a temporal Cross transformation by applying a + * {@link CrossFunction} to each pair of crossed elements.<br/> + * Each CrossFunction call returns exactly one element. * - * @param function The CrossFunction that is called for each pair of crossed elements. - * @return An CrossOperator that represents the crossed result DataSet + * @param function + * The CrossFunction that is called for each pair of crossed + * elements. + * @return A CrossOperator that represents the crossed result DataStream * * @see CrossFunction * @see DataSet @@ -54,15 +72,29 @@ public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, Stream return createCrossOperator(function); } + /** + * Finalizes a temporal Cross transformation by emitting all pairs in a + * new Tuple2. + * + * @return A CrossOperator that represents the crossed result DataStream + */ + public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() { + return createCrossOperator(new CrossOperator.DefaultCrossFunction<I1, I2>()); + } + protected <R> SingleOutputStreamOperator<R, ?> createCrossOperator( CrossFunction<I1, I2, R> function) { - return op.input1.connect(op.input2).addGeneralWindowCross(function, op.windowSize, - op.slideInterval, op.timeStamp1, op.timeStamp2); + TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function, + op.input1.getType(), op.input2.getType()); - } + CrossWindowFunction<I1, I2, R> crossWindowFunction = new CrossWindowFunction<I1, I2, R>( + clean(function)); - // ---------------------------------------------------------------------------------------- + return op.input1.connect(op.input2).addGeneralWindowCombine(crossWindowFunction, + outTypeInfo, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java index 89c80ab..3051587 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java @@ -18,15 +18,18 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.function.co.JoinWindowFunction; import org.apache.flink.streaming.util.keys.FieldsKeySelector; import org.apache.flink.streaming.util.keys.PojoKeySelector; public class StreamJoinOperator<I1, I2> extends - WindowDBOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> { + TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> { public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) { super(input1, input2); @@ -51,7 +54,7 @@ public class StreamJoinOperator<I1, I2> extends * that should be used as join keys.<br/> * <b>Note: Fields can only be selected as join keys on Tuple * DataStreams.</b><br/> - * + * * @param fields * The indexes of the other Tuple fields of the first join * DataStreams that should be used as keys. @@ -59,8 +62,8 @@ public class StreamJoinOperator<I1, I2> extends * {@link JoinPredicate#equalTo} to continue the Join. */ public JoinPredicate<I1, I2> where(int... fields) { - return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector( - op.input1.getType(), fields)); + return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(op.input1.getType(), + fields)); } /** @@ -68,7 +71,7 @@ public class StreamJoinOperator<I1, I2> extends * Defines the fields of the first join {@link DataStream} that should * be used as grouping keys. Fields are the names of member fields of * the underlying type of the data stream. - * + * * @param fields * The fields of the first join DataStream that should be * used as keys. @@ -105,12 +108,13 @@ public class StreamJoinOperator<I1, I2> extends * Intermediate step of a temporal Join transformation. <br/> * To continue the Join transformation, select the join key of the second * input {@link DataStream} by calling {@link JoinPredicate#equalTo} - * + * */ public static class JoinPredicate<I1, I2> { - private StreamJoinOperator<I1, I2> op; - private final KeySelector<I1, ?> keys1; + public StreamJoinOperator<I1, I2> op; + public KeySelector<I1, ?> keys1; + public KeySelector<I2, ?> keys2; private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) { this.op = operator; @@ -124,37 +128,30 @@ public class StreamJoinOperator<I1, I2> extends * <b>Note: Fields can only be selected as join keys on Tuple * DataStreams.</b><br/> * - * The resulting operator wraps each pair of joining elements into a - * {@link Tuple2}, with the element of the first input being the first - * field of the tuple and the element of the second input being the - * second field of the tuple. - * * @param fields * The indexes of the Tuple fields of the second join * DataStream that should be used as keys. - * @return The joined data stream. + * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or + * {@link FinalizeStreamJoin#withDefault} to complete */ - public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(int... fields) { - return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(), - fields)); + public FinalizeStreamJoin<I1, I2> equalTo(int... fields) { + keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields); + return new FinalizeStreamJoin<I1, I2>(this); } /** * Continues a temporal Join transformation and defines the fields of * the second join {@link DataStream} that should be used as join keys.<br/> - * - * The resulting operator wraps each pair of joining elements into a - * {@link Tuple2}, with the element of the first input being the first - * field of the tuple and the element of the second input being the - * second field of the tuple. - * + * * @param fields * The fields of the second join DataStream that should be * used as keys. - * @return The joined data stream. + * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or + * {@link FinalizeStreamJoin#withDefault} to complete */ - public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(String... fields) { - return createJoinOperator(new PojoKeySelector<I2>(op.input2.getType(), fields)); + public FinalizeStreamJoin<I1, I2> equalTo(String... fields) { + this.keys2 = new PojoKeySelector<I2>(op.input2.getType(), fields); + return new FinalizeStreamJoin<I1, I2>(this); } /** @@ -164,29 +161,75 @@ public class StreamJoinOperator<I1, I2> extends * second DataStream and extracts a single key value on which the * DataStream is joined. </br> * - * The resulting operator wraps each pair of joining elements into a - * {@link Tuple2}, with the element of the first input being the first - * field of the tuple and the element of the second input being the - * second field of the tuple. - * * @param keySelector * The KeySelector function which extracts the key values * from the second DataStream on which it is joined. + * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or + * {@link FinalizeStreamJoin#withDefault} to complete + */ + public <K> FinalizeStreamJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) { + this.keys2 = keySelector; + return new FinalizeStreamJoin<I1, I2>(this); + } + + } + + public static class FinalizeStreamJoin<I1, I2> { + private final JoinPredicate<I1, I2> predicate; + + private FinalizeStreamJoin(JoinPredicate<I1, I2> predicate) { + this.predicate = predicate; + } + + /** + * Completes a stream join. </p> The resulting operator wraps each pair + * of joining elements into a {@link Tuple2}, with the element of the + * first input being the first field of the tuple and the element of the + * second input being the second field of the tuple. + * + * @return The joined data stream. + */ + public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() { + return createJoinOperator(new DefaultJoinFunction<I1, I2>()); + } + + /** + * Completes a stream join. </p> The resulting operator wraps each pair + * of joining elements using the user defined {@link JoinFunction} + * * @return The joined data stream. */ - public <K> SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo( - KeySelector<I2, K> keySelector) { - return createJoinOperator(keySelector); + public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) { + return createJoinOperator(joinFunction); } - protected SingleOutputStreamOperator<Tuple2<I1, I2>, ?> createJoinOperator( - KeySelector<I2, ?> keys2) { + private <OUT> SingleOutputStreamOperator<OUT, ?> createJoinOperator( + JoinFunction<I1, I2, OUT> joinFunction) { + + JoinWindowFunction<I1, I2, OUT> joinWindowFunction = new JoinWindowFunction<I1, I2, OUT>( + predicate.keys1, predicate.keys2, joinFunction); + + StreamJoinOperator<I1, I2> op = predicate.op; + + TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction, + op.input1.getType(), op.input2.getType()); - JoinWindowFunction<I1, I2> joinWindowFunction = new JoinWindowFunction<I1, I2>(keys1, - keys2); - return op.input1.connect(op.input2).addGeneralWindowJoin(joinWindowFunction, - op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); + return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction, + outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); } } + public static final class DefaultJoinFunction<T1, T2> implements + JoinFunction<T1, T2, Tuple2<T1, T2>> { + + private static final long serialVersionUID = 1L; + private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>(); + + @Override + public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception { + outTuple.f0 = first; + outTuple.f1 = second; + return outTuple; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java new file mode 100644 index 0000000..cd8aabd --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; + +public abstract class TemporalOperator<I1, I2, OP> { + + public final DataStream<I1> input1; + public final DataStream<I2> input2; + + public long windowSize; + public long slideInterval; + + public TimeStamp<I1> timeStamp1; + public TimeStamp<I2> timeStamp2; + + public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) { + if (input1 == null || input2 == null) { + throw new NullPointerException(); + } + this.input1 = input1.copy(); + this.input2 = input2.copy(); + } + + /** + * Continues a temporal Join transformation.<br/> + * Defines the window size on which the two DataStreams will be joined. + * + * @param windowSize + * The size of the window in milliseconds. + * @return An incomplete Join transformation. Call {@link JoinWindow#where} + * to continue the Join. + */ + public OP onWindow(long windowSize) { + return onWindow(windowSize, windowSize); + } + + /** + * Continues a temporal Join transformation.<br/> + * Defines the window size on which the two DataStreams will be joined. + * + * @param windowSize + * The size of the window in milliseconds. + * @param slideInterval + * The slide size of the window. + * @return An incomplete Join transformation. Call {@link JoinWindow#where} + * to continue the Join. + */ + public OP onWindow(long windowSize, long slideInterval) { + return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(), + new DefaultTimeStamp<I2>()); + } + + /** + * Continues a temporal Join transformation.<br/> + * Defines the window size on which the two DataStreams will be joined. + * + * @param windowSize + * The size of the window in milliseconds. + * @param slideInterval + * The slide size of the window. + * @param timeStamp1 + * The timestamp used to extract time from the elements of the + * first data stream. + * @param timeStamp2 + * The timestamp used to extract time from the elements of the + * second data stream. + * @return An incomplete Join transformation. Call {@link JoinWindow#where} + * to continue the Join. + */ + public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1, + TimeStamp<I2> timeStamp2) { + + this.windowSize = windowSize; + this.slideInterval = slideInterval; + + this.timeStamp1 = timeStamp1; + this.timeStamp2 = timeStamp2; + + return createNextWindowOperator(); + } + + protected abstract OP createNextWindowOperator(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java deleted file mode 100644 index 4f6f0c1..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; - -public abstract class WindowDBOperator<I1, I2, OP> { - - protected final DataStream<I1> input1; - protected final DataStream<I2> input2; - - long windowSize; - long slideInterval; - - TimeStamp<I1> timeStamp1; - TimeStamp<I2> timeStamp2; - - public WindowDBOperator(DataStream<I1> input1, DataStream<I2> input2) { - if (input1 == null || input2 == null) { - throw new NullPointerException(); - } - this.input1 = input1.copy(); - this.input2 = input2.copy(); - } - - /** - * Continues a temporal Join transformation.<br/> - * Defines the window size on which the two DataStreams will be joined. - * - * @param windowSize - * The size of the window in milliseconds. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. - */ - public OP onWindow(long windowSize) { - return onWindow(windowSize, windowSize); - } - - /** - * Continues a temporal Join transformation.<br/> - * Defines the window size on which the two DataStreams will be joined. - * - * @param windowSize - * The size of the window in milliseconds. - * @param slideInterval - * The slide size of the window. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. - */ - public OP onWindow(long windowSize, long slideInterval) { - return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(), - new DefaultTimeStamp<I2>()); - } - - /** - * Continues a temporal Join transformation.<br/> - * Defines the window size on which the two DataStreams will be joined. - * - * @param windowSize - * The size of the window in milliseconds. - * @param slideInterval - * The slide size of the window. - * @param timeStamp1 - * The timestamp used to extract time from the elements of the - * first data stream. - * @param timeStamp2 - * The timestamp used to extract time from the elements of the - * second data stream. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. - */ - public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1, - TimeStamp<I2> timeStamp2) { - - this.windowSize = windowSize; - this.slideInterval = slideInterval; - - this.timeStamp1 = timeStamp1; - this.timeStamp2 = timeStamp2; - - return createNextWindowOperator(); - } - - protected abstract OP createNextWindowOperator(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java index f61738a..9cafcd1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java @@ -20,20 +20,25 @@ package org.apache.flink.streaming.api.function.co; import java.util.List; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.util.Collector; -public class CrossWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> { +public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> { private static final long serialVersionUID = 1L; - @Override - public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out) - throws Exception { + private CrossFunction<IN1, IN2, OUT> crossFunction; + + public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) { + this.crossFunction = crossFunction; + } - for (IN1 item1 : first) { - for (IN2 item2 : second) { - out.collect(new Tuple2<IN1, IN2>(item1, item2)); + @Override + public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception { + for (IN1 firstValue : first) { + for (IN2 secondValue : second) { + out.collect(crossFunction.cross(firstValue, secondValue)); } } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java index 53e2657..9f5cd5d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java @@ -20,31 +20,30 @@ package org.apache.flink.streaming.api.function.co; import java.util.List; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; -public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> { +public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> { private static final long serialVersionUID = 1L; private KeySelector<IN1, ?> keySelector1; private KeySelector<IN2, ?> keySelector2; + private JoinFunction<IN1, IN2, OUT> joinFunction; - public JoinWindowFunction() { - } - - public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) { + public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2, + JoinFunction<IN1, IN2, OUT> joinFunction) { this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; + this.joinFunction = joinFunction; } @Override - public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out) - throws Exception { + public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception { for (IN1 item1 : first) { for (IN2 item2 : second) { if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { - out.collect(new Tuple2<IN1, IN2>(item1, item2)); + out.collect(joinFunction.join(item1, item2)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index d8cdfa5..ffdfbc3 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable { DataStream<Integer> inStream2 = env.fromCollection(in2); inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) - .where(0).equalTo(0).addSink(new JoinResultSink()); + .where(0).equalTo(0).withDefault().addSink(new JoinResultSink()); inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) .with(new CrossFunction<Tuple2<Integer,String>, Integer, Tuple2<Tuple2<Integer,String>, Integer>>() { http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 93df823..2586e3c 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -67,7 +67,8 @@ public class WindowJoin { .join(salaries) .onWindow(1000) .where(0) - .equalTo(0); + .equalTo(0) + .withDefault(); // emit result if (fileOutput) { http://git-wip-us.apache.org/repos/asf/flink/blob/652327be/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index 5f46c84..a117412 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * received records. * */ - def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count()) + def count: DataStream[Long] = new DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]] /** * Creates a new DataStream by applying the given function to every element of this DataStream. @@ -445,14 +445,14 @@ class DataStream[T](javaStream: JavaStream[T]) { * Creates a new SplitDataStream that contains only the elements satisfying the * given output selector predicate. */ - def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = { + def split(fun: T => String): SplitDataStream[T] = { if (fun == null) { throw new NullPointerException("OutputSelector must not be null.") } val selector = new OutputSelector[T] { val cleanFun = clean(fun) def select(in: T): java.lang.Iterable[String] = { - asJavaIterable(cleanFun(in).toIterable) + List(cleanFun(in)) } } split(selector)
