[scala] [streaming] Added scala window helpers + timestamp rework for lambda support
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fac77348 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fac77348 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fac77348 Branch: refs/heads/master Commit: fac773488a77c0c70ed74a2dca09224f275ec77a Parents: 8bf9416 Author: Gyula Fora <[email protected]> Authored: Fri Jan 2 20:04:51 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 20:04:51 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/CoWindowDataStream.java | 20 +++--- .../api/datastream/ConnectedDataStream.java | 30 ++++---- .../api/datastream/TemporalOperator.java | 39 ++++++----- .../co/CoGroupedWindowReduceInvokable.java | 4 +- .../operator/co/CoWindowInvokable.java | 8 +-- .../operator/co/CoWindowReduceInvokable.java | 15 ++-- .../api/invokable/util/DefaultTimeStamp.java | 39 ----------- .../streaming/api/invokable/util/TimeStamp.java | 46 ------------- .../streaming/api/windowing/helper/Delta.java | 4 +- .../api/windowing/helper/SystemTimestamp.java | 37 ++++++++++ .../streaming/api/windowing/helper/Time.java | 72 ++++++++++++++++---- .../api/windowing/helper/Timestamp.java | 39 +++++++++++ .../api/windowing/helper/TimestampWrapper.java | 44 ++++++++++++ .../windowing/policy/ActiveTriggerPolicy.java | 6 +- .../windowing/policy/TimeEvictionPolicy.java | 31 +++++---- .../api/windowing/policy/TimeTriggerPolicy.java | 42 ++++++------ .../streaming/api/WindowCrossJoinTest.java | 30 ++++---- .../operator/CoGroupedWindowReduceTest.java | 5 +- .../invokable/operator/CoWindowReduceTest.java | 5 +- .../api/invokable/operator/CoWindowTest.java | 23 +++---- .../operator/GroupedWindowInvokableTest.java | 21 +++--- .../invokable/operator/WindowInvokableTest.java | 16 ++--- .../policy/TimeEvictionPolicyTest.java | 12 ++-- .../windowing/policy/TimeTriggerPolicyTest.java | 28 +++----- .../examples/windowing/DeltaExtractExample.java | 6 +- .../flink/api/scala/streaming/DataStream.scala | 19 +----- .../scala/streaming/StreamCrossOperator.scala | 2 +- .../streaming/StreamExecutionEnvironment.scala | 12 +++- .../scala/streaming/StreamJoinOperator.scala | 1 + .../scala/streaming/WindowedDataStream.scala | 6 +- .../api/scala/streaming/windowing/Delta.scala | 47 +++++++++++++ .../api/scala/streaming/windowing/Time.scala | 55 +++++++++++++++ 32 files changed, 454 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java index c8c634a..9129f9e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java @@ -22,33 +22,33 @@ import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * A {@link CoWindowDataStream} represents two data streams whose elements are * batched together into sliding windows. Operation * {@link #reduce(CoReduceFunction)} can be applied for each window. - * + * * @param <IN1> * The type of the first input data stream * @param <IN2> * The type of the second input data stream */ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> { - TimeStamp<IN1> timeStamp1; - TimeStamp<IN2> timeStamp2; + TimestampWrapper<IN1> timeStamp1; + TimestampWrapper<IN2> timeStamp2; protected CoWindowDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2, long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, - TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) { + TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2); this.timeStamp1 = timeStamp1; this.timeStamp2 = timeStamp2; } protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long windowSize1, - long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1, - TimeStamp<IN2> timeStamp2) { + long windowSize2, long slideInterval1, long slideInterval2, + TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2); this.timeStamp1 = timeStamp1; this.timeStamp2 = timeStamp2; @@ -96,9 +96,9 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> CoReduceFunction<IN1, IN2, OUT> coReducer) { CoWindowReduceInvokable<IN1, IN2, OUT> invokable; if (isGrouped) { - invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1, - batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, timeStamp1, - timeStamp2); + invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), + batchSize1, batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, + timeStamp1, timeStamp2); } else { invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1, batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 39b6460..efd9531 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * The ConnectedDataStream represents a stream for two different data types. It @@ -305,8 +305,8 @@ public class ConnectedDataStream<IN1, IN2> { * @return The transformed {@link ConnectedDataStream} */ public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2, - long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1, - TimeStamp<IN2> timeStamp2) { + long slideInterval1, long slideInterval2, TimestampWrapper<IN1> timeStamp1, + TimestampWrapper<IN2> timeStamp2) { if (windowSize1 < 1 || windowSize2 < 1) { throw new IllegalArgumentException("Window size must be positive"); } @@ -338,10 +338,12 @@ public class ConnectedDataStream<IN1, IN2> { * second input data stream are slid by after each transformation * @return The transformed {@link ConnectedDataStream} */ + @SuppressWarnings("unchecked") public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) { return window(windowSize1, windowSize2, slideInterval1, slideInterval2, - new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>()); + (TimestampWrapper<IN1>) SystemTimestamp.getWrapper(), + (TimestampWrapper<IN2>) SystemTimestamp.getWrapper()); } /** @@ -365,7 +367,7 @@ public class ConnectedDataStream<IN1, IN2> { * @return The transformed {@link ConnectedDataStream} */ public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2, - TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) { + TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2); } @@ -384,9 +386,11 @@ public class ConnectedDataStream<IN1, IN2> { * milliseconds * @return The transformed {@link ConnectedDataStream} */ + @SuppressWarnings("unchecked") public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) { return window(windowSize1, windowSize2, windowSize1, windowSize2, - new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>()); + (TimestampWrapper<IN1>) SystemTimestamp.getWrapper(), + (TimestampWrapper<IN2>) SystemTimestamp.getWrapper()); } /** @@ -479,10 +483,12 @@ public class ConnectedDataStream<IN1, IN2> { * * @return The transformed {@link DataStream}. */ + @SuppressWarnings("unchecked") public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce( CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval) { return windowReduce(coWindowFunction, windowSize, slideInterval, - new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>()); + (TimestampWrapper<IN1>) SystemTimestamp.getWrapper(), + (TimestampWrapper<IN2>) SystemTimestamp.getWrapper()); } /** @@ -510,7 +516,7 @@ public class ConnectedDataStream<IN1, IN2> { */ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce( CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval, - TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) { + TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) { if (windowSize < 1) { throw new IllegalArgumentException("Window size must be positive"); @@ -541,8 +547,8 @@ public class ConnectedDataStream<IN1, IN2> { public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine( CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo, - long windowSize, long slideInterval, TimeStamp<IN1> timestamp1, - TimeStamp<IN2> timestamp2) { + long windowSize, long slideInterval, TimestampWrapper<IN1> timestamp1, + TimestampWrapper<IN2> timestamp2) { if (windowSize < 1) { throw new IllegalArgumentException("Window size must be positive"); @@ -550,7 +556,7 @@ public class ConnectedDataStream<IN1, IN2> { if (slideInterval < 1) { throw new IllegalArgumentException("Slide interval must be positive"); } - + return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java index cd8aabd..e5385f0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.datastream; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public abstract class TemporalOperator<I1, I2, OP> { @@ -29,8 +29,8 @@ public abstract class TemporalOperator<I1, I2, OP> { public long windowSize; public long slideInterval; - public TimeStamp<I1> timeStamp1; - public TimeStamp<I2> timeStamp2; + public TimestampWrapper<I1> timeStamp1; + public TimestampWrapper<I2> timeStamp2; public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) { if (input1 == null || input2 == null) { @@ -41,37 +41,37 @@ public abstract class TemporalOperator<I1, I2, OP> { } /** - * Continues a temporal Join transformation.<br/> - * Defines the window size on which the two DataStreams will be joined. + * Continues a temporal transformation.<br/> + * Defines the window size on which the two DataStreams will be transformed. * * @param windowSize * The size of the window in milliseconds. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. + * @return An incomplete temporal transformation. */ public OP onWindow(long windowSize) { return onWindow(windowSize, windowSize); } /** - * Continues a temporal Join transformation.<br/> - * Defines the window size on which the two DataStreams will be joined. + * Continues a temporal transformation.<br/> + * Defines the window size on which the two DataStreams will be transformed. * * @param windowSize * The size of the window in milliseconds. * @param slideInterval * The slide size of the window. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. + * @return An incomplete temporal transformation. */ + @SuppressWarnings("unchecked") public OP onWindow(long windowSize, long slideInterval) { - return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(), - new DefaultTimeStamp<I2>()); + return onWindow(windowSize, slideInterval, + (TimestampWrapper<I1>) SystemTimestamp.getWrapper(), + (TimestampWrapper<I2>) SystemTimestamp.getWrapper()); } /** - * Continues a temporal Join transformation.<br/> - * Defines the window size on which the two DataStreams will be joined. + * Continues a temporal transformation.<br/> + * Defines the window size on which the two DataStreams will be transformed. * * @param windowSize * The size of the window in milliseconds. @@ -83,11 +83,10 @@ public abstract class TemporalOperator<I1, I2, OP> { * @param timeStamp2 * The timestamp used to extract time from the elements of the * second data stream. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. + * @return An incomplete temporal transformation. */ - public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1, - TimeStamp<I2> timeStamp2) { + public OP onWindow(long windowSize, long slideInterval, TimestampWrapper<I1> timeStamp1, + TimestampWrapper<I2> timeStamp2) { this.windowSize = windowSize; this.slideInterval = slideInterval; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java index 4905566..736239f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java @@ -22,8 +22,8 @@ import java.util.Map; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends CoWindowReduceInvokable<IN1, IN2, OUT> { @@ -38,7 +38,7 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2, - TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) { + TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) { super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1, timestamp2); this.keySelector1 = keySelector1; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java index 7df5668..59552f4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java @@ -23,8 +23,8 @@ import java.util.List; import org.apache.commons.math.util.MathUtils; import org.apache.flink.streaming.api.function.co.CoWindowFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.state.CircularFifoList; public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> { @@ -35,8 +35,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> protected long slideSize; protected CircularFifoList<StreamRecord<IN1>> circularList1; protected CircularFifoList<StreamRecord<IN2>> circularList2; - protected TimeStamp<IN1> timeStamp1; - protected TimeStamp<IN2> timeStamp2; + protected TimestampWrapper<IN1> timeStamp1; + protected TimestampWrapper<IN2> timeStamp2; protected StreamWindow window; @@ -44,7 +44,7 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> protected long nextRecordTime; public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, - long slideInterval, TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) { + long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { super(coWindowFunction); this.coWindowFunction = coWindowFunction; this.windowSize = windowSize; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java index fa47761..0c8598f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java @@ -19,8 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.co; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> { private static final long serialVersionUID = 1L; @@ -28,14 +27,14 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab protected long startTime2; protected long nextRecordTime1; protected long nextRecordTime2; - protected TimeStamp<IN1> timestamp1; - protected TimeStamp<IN2> timestamp2; + protected TimestampWrapper<IN1> timestamp1; + protected TimestampWrapper<IN2> timestamp2; protected StreamWindow<IN1> window1; protected StreamWindow<IN2> window2; public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, - long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1, - TimeStamp<IN2> timestamp2) { + long windowSize2, long slideInterval1, long slideInterval2, + TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) { super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2); this.timestamp1 = timestamp1; this.timestamp2 = timestamp2; @@ -51,10 +50,10 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2); this.batch1 = this.window1; this.batch2 = this.window2; - if (timestamp1 instanceof DefaultTimeStamp) { + if (timestamp1.isDefaultTimestamp()) { (new TimeCheck1()).start(); } - if (timestamp2 instanceof DefaultTimeStamp) { + if (timestamp2.isDefaultTimestamp()) { (new TimeCheck2()).start(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java deleted file mode 100644 index 2f22e8e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.util; - -/** - * Default timestamp function that uses the Java System.currentTimeMillis() - * method to retrieve a timestamp. - * - * @param <T> - * Type of the inputs of the reducing function. - */ -public class DefaultTimeStamp<T> implements TimeStamp<T> { - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(T value) { - return System.currentTimeMillis(); - } - - @Override - public long getStartTime() { - return System.currentTimeMillis(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java deleted file mode 100644 index 86fa101..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.util; - -import java.io.Serializable; - -/** - * Interface for getting a timestamp from a custom value. Used in window - * reduces. In order to work properly, the timestamps must be non-decreasing. - * - * @param <T> - * Type of the value to create the timestamp from. - */ -public interface TimeStamp<T> extends Serializable { - - /** - * Values - * - * @param value - * The value to create the timestamp from - * @return The timestamp - */ - public long getTimestamp(T value); - - /** - * Function to define the starting time for reference - * - * @return The starting timestamp - */ - public long getStartTime(); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java index 9c8c5ca..5434a4e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java @@ -84,8 +84,8 @@ public class Delta<DATA> implements WindowingHelper<DATA> { * The threshold used by the delta function. * @return Helper representing a delta trigger or eviction policy */ - public static <DATA> Delta<DATA> of(DeltaFunction<DATA> deltaFunction, DATA initVal, - double threshold) { + public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction, + DATA initVal) { return new Delta<DATA>(deltaFunction, initVal, threshold); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java new file mode 100644 index 0000000..8581ac5 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.helper; + +/** + * {@link Timestamp} implementation to be used when system time is needed to + * determine windows + */ +public class SystemTimestamp<T> implements Timestamp<T> { + + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(T value) { + return System.currentTimeMillis(); + } + + public static <R> TimestampWrapper<R> getWrapper() { + return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java index d987e32..9dc1c8c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java @@ -19,8 +19,6 @@ package org.apache.flink.streaming.api.windowing.helper; import java.util.concurrent.TimeUnit; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; @@ -39,7 +37,7 @@ public class Time<DATA> implements WindowingHelper<DATA> { private long length; private TimeUnit granularity; - private TimeStamp<DATA> timeStamp; + private TimestampWrapper<DATA> timestampWrapper; private long delay; /** @@ -53,31 +51,52 @@ public class Time<DATA> implements WindowingHelper<DATA> { * the smallest possible granularity is milliseconds. Any smaller * time unit might cause an error at runtime due to conversion * problems. - * @param timeStamp + * @param timestamp * The user defined timestamp that will be used to extract time * information from the incoming elements + * @param startTime + * The startTime of the stream for computing the first window */ - private Time(long length, TimeUnit timeUnit, TimeStamp<DATA> timeStamp) { + private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) { + this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime)); + } + + /** + * Creates an helper representing a trigger which triggers every given + * length or an eviction which evicts all elements older than length. + * + * @param length + * The number of time units + * @param timeUnit + * The unit of time such as minute oder millisecond. Note that + * the smallest possible granularity is milliseconds. Any smaller + * time unit might cause an error at runtime due to conversion + * problems. + * @param timestampWrapper + * The user defined {@link TimestampWrapper} that will be used to + * extract time information from the incoming elements + */ + private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) { this.length = length; this.granularity = timeUnit; - this.timeStamp = timeStamp; + this.timestampWrapper = timestampWrapper; this.delay = 0; } @Override public EvictionPolicy<DATA> toEvict() { - return new TimeEvictionPolicy<DATA>(granularityInMillis(), timeStamp); + return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper); } @Override public TriggerPolicy<DATA> toTrigger() { - return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay); + return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay); } /** * Creates a helper representing a time trigger which triggers every given * length (slide size) or a time eviction which evicts all elements older - * than length (window size). + * than length (window size) using System time. * * @param length * The number of time units @@ -88,8 +107,10 @@ public class Time<DATA> implements WindowingHelper<DATA> { * problems. * @return Helper representing the time based trigger and eviction policy */ + @SuppressWarnings("unchecked") public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) { - return new Time<DATA>(length, timeUnit, new DefaultTimeStamp<DATA>()); + return new Time<DATA>(length, timeUnit, + (TimestampWrapper<DATA>) SystemTimestamp.getWrapper()); } /** @@ -99,13 +120,32 @@ public class Time<DATA> implements WindowingHelper<DATA> { * * @param length * The number of time units - * @param timeStamp + * @param timestamp + * The user defined timestamp that will be used to extract time + * information from the incoming elements + * @param startTime + * The startTime used to compute the first window + * @return Helper representing the time based trigger and eviction policy + */ + public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) { + return new Time<DATA>(length, null, timestamp, startTime); + } + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using a user defined timestamp extractor. By + * default the start time is set to 0. + * + * @param length + * The number of time units + * @param timestamp * The user defined timestamp that will be used to extract time * information from the incoming elements * @return Helper representing the time based trigger and eviction policy */ - public static <DATA> Time<DATA> of(long length, TimeStamp<DATA> timeStamp) { - return new Time<DATA>(length, TimeUnit.MILLISECONDS, timeStamp); + public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) { + return of(length, timestamp, 0); } /** @@ -121,6 +161,10 @@ public class Time<DATA> implements WindowingHelper<DATA> { } private long granularityInMillis() { - return this.granularity.toMillis(this.length); + if (granularity != null) { + return this.granularity.toMillis(this.length); + } else { + return this.length; + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java new file mode 100644 index 0000000..fea6020 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.helper; + +import java.io.Serializable; + +/** + * Interface for getting a timestamp from a custom value. Used in window + * reduces. In order to work properly, the timestamps must be non-decreasing. + * + * @param <T> + * Type of the value to create the timestamp from. + */ +public interface Timestamp<T> extends Serializable { + + /** + * Values + * + * @param value + * The value to create the timestamp from + * @return The timestamp + */ + public long getTimestamp(T value); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java new file mode 100644 index 0000000..8c3a09d --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.helper; + +import java.io.Serializable; + +public class TimestampWrapper<T> implements Serializable { + + private static final long serialVersionUID = 1L; + private long startTime; + private Timestamp<T> timestamp; + + public TimestampWrapper(Timestamp<T> timeStamp, long startTime) { + this.timestamp = timeStamp; + this.startTime = startTime; + } + + public long getTimestamp(T in) { + return timestamp.getTimestamp(in); + } + + public long getStartTime() { + return startTime; + } + + public boolean isDefaultTimestamp() { + return timestamp instanceof SystemTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java index a8a704d..414250c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.api.windowing.policy; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; /** * This interface extends the {@link TriggerPolicy} interface with functionality @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp; * first. It can return zero ore more fake data points which will be added * before the the currently arrived real element gets processed. This allows to * handle empty windows in time based windowing with an user defined - * {@link TimeStamp}. Triggers are not called on fake datapoint. A fake + * {@link Timestamp}. Triggers are not called on fake datapoint. A fake * datapoint is always considered as triggered. * * 2) An active trigger has a factory method for a runnable. This factory method @@ -49,7 +49,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> { * first. It can return zero ore more fake data points which will be added * before the the currently arrived real element gets processed. This allows * to handle empty windows in time based windowing with an user defined - * {@link TimeStamp}. Triggers are not called on fake datapoints. A fake + * {@link Timestamp}. Triggers are not called on fake datapoints. A fake * datapoint is always considered as triggered. * * @param datapoint http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java index aca1dee..16c30fc 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java @@ -19,14 +19,15 @@ package org.apache.flink.streaming.api.windowing.policy; import java.util.LinkedList; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * This eviction policy evicts all elements which are older then a specified - * time. The time is measured using a given {@link TimeStamp} implementation. A + * time. The time is measured using a given {@link Timestamp} implementation. A * point in time is always represented as long. Therefore, the granularity can * be set as long value as well. - * + * * @param <DATA> * The type of the incoming data points which are processed by this * policy. @@ -40,12 +41,12 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>, private static final long serialVersionUID = -1457476766124518220L; private long granularity; - private TimeStamp<DATA> timestamp; + private TimestampWrapper<DATA> timestampWrapper; private LinkedList<Long> buffer = new LinkedList<Long>(); /** * This eviction policy evicts all elements which are older than a specified - * time. The time is measured using a given {@link TimeStamp} + * time. The time is measured using a given {@link Timestamp} * implementation. A point in time is always represented as long. Therefore, * the granularity can be set as long value as well. If this value is set to * 2 the policy will evict all elements which are older as 2. @@ -60,12 +61,12 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>, * The granularity of the eviction. If this value is set to 2 the * policy will evict all elements which are older as 2(if * (time(X)<current time-granularity) evict X). - * @param timestamp - * The {@link TimeStamp} to measure the time with. This can be - * either user defined of provided by the API. + * @param timestampWrapper + * The {@link TimestampWrapper} to measure the time with. This + * can be either user defined of provided by the API. */ - public TimeEvictionPolicy(long granularity, TimeStamp<DATA> timestamp) { - this.timestamp = timestamp; + public TimeEvictionPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) { + this.timestampWrapper = timestampWrapper; this.granularity = granularity; } @@ -78,7 +79,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>, try { threshold = (Long) datapoint - granularity; } catch (ClassCastException e) { - threshold = timestamp.getTimestamp((DATA) datapoint) - granularity; + threshold = timestampWrapper.getTimestamp((DATA) datapoint) - granularity; } // return result @@ -91,9 +92,9 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>, checkForDeleted(bufferSize); - //remember timestamp - long time=timestamp.getTimestamp(datapoint); - + // remember timestamp + long time = timestampWrapper.getTimestamp(datapoint); + // delete and count expired tuples long threshold = time - granularity; int counter = deleteAndCountExpired(threshold); @@ -130,7 +131,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>, @Override public TimeEvictionPolicy<DATA> clone() { - return new TimeEvictionPolicy<DATA>(granularity, timestamp); + return new TimeEvictionPolicy<DATA>(granularity, timestampWrapper); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java index 57bccf2..1e91b8e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java @@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.windowing.policy; import java.util.LinkedList; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * This trigger policy triggers with regard to the time. The is measured using a - * given {@link TimeStamp} implementation. A point in time is always represented + * given {@link Timestamp} implementation. A point in time is always represented * as long. Therefore, parameters such as granularity and delay can be set as * long value as well. * @@ -42,12 +42,12 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, protected long startTime; protected long granularity; - protected TimeStamp<DATA> timestamp; + protected TimestampWrapper<DATA> timestampWrapper; protected long delay; /** * This trigger policy triggers with regard to the time. The is measured - * using a given {@link TimeStamp} implementation. A point in time is always + * using a given {@link Timestamp} implementation. A point in time is always * represented as long. Therefore, parameters such as granularity can be set * as long value as well. If this value for the granularity is set to 2 for * example, the policy will trigger at every second point in time. @@ -55,22 +55,22 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, * @param granularity * The granularity of the trigger. If this value is set to x the * policy will trigger at every x-th time point - * @param timestamp - * The {@link TimeStamp} to measure the time with. This can be - * either user defined of provided by the API. + * @param timestampWrapper + * The {@link TimestampWrapper} to measure the time with. This + * can be either user defined of provided by the API. * @param timeWrapper * This policy creates fake elements to not miss windows in case * no element arrived within the duration of the window. This * extractor should wrap a long into such an element of type * DATA. */ - public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp) { - this(granularity, timestamp, 0); + public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) { + this(granularity, timestampWrapper, 0); } /** * This is mostly the same as - * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition + * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In addition * to granularity and timestamp a delay can be specified for the first * trigger. If the start time given by the timestamp is x, the delay is y, * and the granularity is z, the first trigger will happen at x+y+z. @@ -78,9 +78,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, * @param granularity * The granularity of the trigger. If this value is set to 2 the * policy will trigger at every second time point - * @param timestamp - * The {@link TimeStamp} to measure the time with. This can be - * either user defined of provided by the API. + * @param timestampWrapper + * The {@link TimestampWrapper} to measure the time with. This + * can be either user defined of provided by the API. * @param delay * A delay for the first trigger. If the start time given by the * timestamp is x, the delay is y, and the granularity is z, the @@ -91,9 +91,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, * extractor should wrap a long into such an element of type * DATA. */ - public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay) { - this.startTime = timestamp.getStartTime() + delay; - this.timestamp = timestamp; + public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper, long delay) { + this.startTime = timestampWrapper.getStartTime() + delay; + this.timestampWrapper = timestampWrapper; this.granularity = granularity; this.delay = delay; } @@ -107,7 +107,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, LinkedList<Object> fakeElements = new LinkedList<Object>(); // check if there is more then one window border missed // use > here. In case >= would fit, the regular call will do the job. - while (timestamp.getTimestamp(datapoint) >= startTime + granularity) { + while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) { startTime += granularity; fakeElements.add(startTime - 1); } @@ -127,7 +127,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, */ @Override public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) { - if (this.timestamp instanceof DefaultTimeStamp) { + if (this.timestampWrapper.isDefaultTimestamp()) { return new TimeCheck(callback); } else { return null; @@ -177,7 +177,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, @Override public synchronized boolean notifyTrigger(DATA datapoint) { - long recordTime = timestamp.getTimestamp(datapoint); + long recordTime = timestampWrapper.getTimestamp(datapoint); if (recordTime >= startTime + granularity) { if (granularity != 0) { startTime = recordTime - ((recordTime - startTime) % granularity); @@ -190,7 +190,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, @Override public TimeTriggerPolicy<DATA> clone() { - return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay); + return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper, delay); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index cd6232a..e856f07 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.sink.SinkFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; @@ -96,12 +96,16 @@ public class WindowCrossJoinTest implements Serializable { DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1); DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2); - inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) - .where(0).equalTo(0).addSink(new JoinResultSink()); + inStream1 + .join(inStream2) + .onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(), + new MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0) + .addSink(new JoinResultSink()); inStream1 .cross(inStream2) - .onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) + .onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(), + new MyTimestamp<Tuple1<Integer>>()) .with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() { private static final long serialVersionUID = 1L; @@ -119,25 +123,15 @@ public class WindowCrossJoinTest implements Serializable { assertEquals(crossExpectedResults, crossResults); } - private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Tuple2<Integer, String> value) { - return 101L; + private static class MyTimestamp<T> extends TimestampWrapper<T> { + public MyTimestamp() { + super(null, 0); } - @Override - public long getStartTime() { - return 100L; - } - } - - private static class MyTimestamp2 implements TimeStamp<Tuple1<Integer>> { private static final long serialVersionUID = 1L; @Override - public long getTimestamp(Tuple1<Integer> value) { + public long getTimestamp(T value) { return 101L; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java index e3f2a1b..508366c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; @@ -82,13 +82,14 @@ public class CoGroupedWindowReduceTest { } } - public static final class MyTimeStamp<T> implements TimeStamp<T> { + public static final class MyTimeStamp<T> extends TimestampWrapper<T> { private static final long serialVersionUID = 1L; private Iterator<Long> timestamps; private long start; public MyTimeStamp(List<Long> timestamps) { + super(null, 0); this.timestamps = timestamps.iterator(); this.start = timestamps.get(0); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java index 90ad483..035a021 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java @@ -27,7 +27,7 @@ import java.util.List; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; @@ -57,13 +57,14 @@ public class CoWindowReduceTest { } } - public static final class MyTimeStamp<T> implements TimeStamp<T> { + public static final class MyTimeStamp<T> extends TimestampWrapper<T> { private static final long serialVersionUID = 1L; private Iterator<Long> timestamps; private long start; public MyTimeStamp(List<Long> timestamps) { + super(null, 0); this.timestamps = timestamps.iterator(); this.start = timestamps.get(0); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java index c6d446a..4ab3492 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java @@ -27,7 +27,8 @@ import java.util.Set; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoWindowFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.util.Collector; import org.junit.Test; @@ -80,7 +81,7 @@ public class CoWindowTest { } - private static final class MyTS1 implements TimeStamp<Integer> { + private static final class MyTS1 implements Timestamp<Integer> { private static final long serialVersionUID = 1L; @@ -89,14 +90,9 @@ public class CoWindowTest { return value; } - @Override - public long getStartTime() { - return 1; - } - } - private static final class MyTS2 implements TimeStamp<Tuple2<Integer, Integer>> { + private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; @@ -105,18 +101,14 @@ public class CoWindowTest { return value.f0; } - @Override - public long getStartTime() { - return 1; - } - } @Test public void coWindowGroupReduceTest2() throws Exception { CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>( - new MyCoGroup1(), 2, 1, new MyTS1(), new MyTS1()); + new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1), + new TimestampWrapper<Integer>(new MyTS1(), 1)); // Windowsize 2, slide 1 // 1,2|2,3|3,4|4,5 @@ -152,7 +144,8 @@ public class CoWindowTest { assertEquals(expected1, actual1); CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>( - new MyCoGroup2(), 2, 3, new MyTS2(), new MyTS2()); + new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), + 1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1)); // WindowSize 2, slide 3 // 1,2|4,5|7,8| http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java index d97cadc..f38d5c1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java @@ -28,7 +28,8 @@ import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; @@ -360,20 +361,18 @@ public class GroupedWindowInvokableTest { expected.add(new Tuple2<Integer, String>(32, "b")); expected.add(new Tuple2<Integer, String>(32, "c")); - TimeStamp<Tuple2<Integer, String>> myTimeStamp = new TimeStamp<Tuple2<Integer, String>>() { + Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>() { private static final long serialVersionUID = 1L; @Override public long getTimestamp(Tuple2<Integer, String> value) { return value.f0; } - - @Override - public long getStartTime() { - return 1; - } }; + TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = new TimestampWrapper<Tuple2<Integer, String>>( + myTimeStamp, 1); + ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>() { private static final long serialVersionUID = 1L; @@ -387,11 +386,11 @@ public class GroupedWindowInvokableTest { LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>(); // Trigger every 2 time units but delay the first trigger by 2 (First // trigger after 4, then every 2) - triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L)); + triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L)); LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>(); // Always delete all elements older then 4 - evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp)); + evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper)); LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>(); @@ -409,10 +408,10 @@ public class GroupedWindowInvokableTest { // repeat the test with central eviction. The result should be the same. triggers.clear(); - triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L)); + triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L)); evictions.clear(); LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>(); - centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp)); + centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper)); invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>( myReduceFunction, keySelector, distributedTriggers, evictions, triggers, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java index 421a999..83b4596 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java @@ -24,7 +24,8 @@ import java.util.LinkedList; import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; @@ -62,18 +63,13 @@ public class WindowInvokableTest { expected.add(10); expected.add(32); - TimeStamp<Integer> myTimeStamp = new TimeStamp<Integer>() { + Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() { private static final long serialVersionUID = 1L; @Override public long getTimestamp(Integer value) { return value; } - - @Override - public long getStartTime() { - return 1; - } }; ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() { @@ -88,11 +84,13 @@ public class WindowInvokableTest { LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>(); // Trigger every 2 time units but delay the first trigger by 2 (First // trigger after 4, then every 2) - triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L)); + triggers.add(new TimeTriggerPolicy<Integer>(2L, new TimestampWrapper<Integer>(myTimeStamp, + 1), 2L)); LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>(); // Always delete all elements older then 4 - evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp)); + evictions.add(new TimeEvictionPolicy<Integer>(4L, new TimestampWrapper<Integer>( + myTimeStamp, 1))); WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>( myReduceFunction, triggers, evictions); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java index 82c8841..b5d502b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java @@ -19,7 +19,8 @@ package org.apache.flink.streaming.api.windowing.policy; import java.util.LinkedList; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.junit.Test; import static org.junit.Assert.*; @@ -35,25 +36,20 @@ public class TimeEvictionPolicyTest { // create a timestamp @SuppressWarnings("serial") - TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() { + Timestamp<Integer> timeStamp = new Timestamp<Integer>() { @Override public long getTimestamp(Integer value) { return value; } - @Override - public long getStartTime() { - return 0; - } - }; // test different granularity for (long granularity = 0; granularity < 40; granularity++) { // create policy TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity, - timeStamp); + new TimestampWrapper<Integer>(timeStamp, 0)); // The trigger status should not effect the policy. Therefore, it's // value is changed after each usage. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java index 9c77a55..2bdbd96 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java @@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.junit.Test; public class TimeTriggerPolicyTest { @@ -33,29 +34,24 @@ public class TimeTriggerPolicyTest { // create a timestamp @SuppressWarnings("serial") - TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() { + Timestamp<Integer> timeStamp = new Timestamp<Integer>() { @Override public long getTimestamp(Integer value) { return value; } - @Override - public long getStartTime() { - return 0; - } - }; // test different granularity for (long granularity = 0; granularity < 31; granularity++) { // create policy - TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp); + + TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, + new TimestampWrapper<Integer>(timeStamp, 0)); // remember window border - // Remark: This might NOT work in case the timeStamp uses - // System.getCurrentTimeMillis to determine the start time. - long currentTime = timeStamp.getStartTime(); + long currentTime = 0; // test by adding values for (int i = 0; i < times.length; i++) { @@ -85,22 +81,18 @@ public class TimeTriggerPolicyTest { // create a timestamp @SuppressWarnings("serial") - TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() { + Timestamp<Integer> timeStamp = new Timestamp<Integer>() { @Override public long getTimestamp(Integer value) { return value; } - @Override - public long getStartTime() { - return 0; - } - }; // create policy - TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp); + TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, + new TimestampWrapper<Integer>(timeStamp, 0)); // expected result Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } }; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java index 1013e6f..d6a9ac0 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java @@ -49,10 +49,8 @@ public class DeltaExtractExample { @SuppressWarnings({ "unchecked", "rawtypes" }) DataStream dstream = env .addSource(new CountingSource()) - .window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d, - 0d, "foo"), 1.2)) - .every(Count.of(2)) - .reduce(new ConcatStrings()); + .window(Delta.of(1.2, new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3( + 0d, 0d, "foo"))).every(Count.of(2)).reduce(new ConcatStrings()); // emit result if (fileOutput) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index 38ad384..dc1e5b3 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.streaming import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.api.common.typeinfo.TypeInformation import scala.reflect.ClassTag @@ -53,24 +54,6 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo class DataStream[T](javaStream: JavaStream[T]) { - /* This code is originally from the Apache Spark project. */ - /** - * Clean a closure to make it ready to serialized and send to tasks - * (removes unreferenced variables in $outer's, updates REPL variables) - * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively - * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt> - * if not. - * - * @param f the closure to clean - * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability - * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not - * serializable - */ - private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) - f - } - /** * Gets the underlying java DataStream object. */ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala index 72052b9..e9010c8 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.function.co.CrossWindowFunction import org.apache.flink.api.common.functions.CrossFunction import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo - +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala index 2489a64..a7a471f 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -161,9 +161,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { Validate.notNull(function, "Function must not be null.") - ClosureCleaner.clean(function, true) + val cleanFun = StreamExecutionEnvironment.clean(function) val typeInfo = implicitly[TypeInformation[T]] - new DataStream[T](javaEnv.addSource(function, typeInfo)) + new DataStream[T](javaEnv.addSource(cleanFun, typeInfo)) } /** @@ -174,8 +174,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { Validate.notNull(function, "Function must not be null.") val sourceFunction = new SourceFunction[T] { + val cleanFun = StreamExecutionEnvironment.clean(function) override def invoke(out: Collector[T]) { - function(out) + cleanFun(out) } } addSource(sourceFunction) @@ -205,6 +206,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } object StreamExecutionEnvironment { + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } /** * Creates an execution environment that represents the context in which the program is http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala index f47d79e..4ed5082 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala @@ -33,6 +33,7 @@ import org.apache.commons.lang.Validate import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.util.keys.KeySelectorUtil import org.apache.flink.api.java.operators.Keys +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala index 5346c4c..e33368c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala @@ -40,14 +40,10 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.SumFunction import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean class WindowedDataStream[T](javaStream: JavaWStream[T]) { - private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) - f - } - /** * Defines the slide size (trigger frequency) for the windowed data stream. * This controls how often the user defined function will be triggered on
