[scala] [streaming] Added scala window helpers + timestamp rework for lambda 
support


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fac77348
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fac77348
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fac77348

Branch: refs/heads/master
Commit: fac773488a77c0c70ed74a2dca09224f275ec77a
Parents: 8bf9416
Author: Gyula Fora <[email protected]>
Authored: Fri Jan 2 20:04:51 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 20:04:51 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/CoWindowDataStream.java      | 20 +++---
 .../api/datastream/ConnectedDataStream.java     | 30 ++++----
 .../api/datastream/TemporalOperator.java        | 39 ++++++-----
 .../co/CoGroupedWindowReduceInvokable.java      |  4 +-
 .../operator/co/CoWindowInvokable.java          |  8 +--
 .../operator/co/CoWindowReduceInvokable.java    | 15 ++--
 .../api/invokable/util/DefaultTimeStamp.java    | 39 -----------
 .../streaming/api/invokable/util/TimeStamp.java | 46 -------------
 .../streaming/api/windowing/helper/Delta.java   |  4 +-
 .../api/windowing/helper/SystemTimestamp.java   | 37 ++++++++++
 .../streaming/api/windowing/helper/Time.java    | 72 ++++++++++++++++----
 .../api/windowing/helper/Timestamp.java         | 39 +++++++++++
 .../api/windowing/helper/TimestampWrapper.java  | 44 ++++++++++++
 .../windowing/policy/ActiveTriggerPolicy.java   |  6 +-
 .../windowing/policy/TimeEvictionPolicy.java    | 31 +++++----
 .../api/windowing/policy/TimeTriggerPolicy.java | 42 ++++++------
 .../streaming/api/WindowCrossJoinTest.java      | 30 ++++----
 .../operator/CoGroupedWindowReduceTest.java     |  5 +-
 .../invokable/operator/CoWindowReduceTest.java  |  5 +-
 .../api/invokable/operator/CoWindowTest.java    | 23 +++----
 .../operator/GroupedWindowInvokableTest.java    | 21 +++---
 .../invokable/operator/WindowInvokableTest.java | 16 ++---
 .../policy/TimeEvictionPolicyTest.java          | 12 ++--
 .../windowing/policy/TimeTriggerPolicyTest.java | 28 +++-----
 .../examples/windowing/DeltaExtractExample.java |  6 +-
 .../flink/api/scala/streaming/DataStream.scala  | 19 +-----
 .../scala/streaming/StreamCrossOperator.scala   |  2 +-
 .../streaming/StreamExecutionEnvironment.scala  | 12 +++-
 .../scala/streaming/StreamJoinOperator.scala    |  1 +
 .../scala/streaming/WindowedDataStream.scala    |  6 +-
 .../api/scala/streaming/windowing/Delta.scala   | 47 +++++++++++++
 .../api/scala/streaming/windowing/Time.scala    | 55 +++++++++++++++
 32 files changed, 454 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
index c8c634a..9129f9e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
@@ -22,33 +22,33 @@ import 
org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * A {@link CoWindowDataStream} represents two data streams whose elements are
  * batched together into sliding windows. Operation
  * {@link #reduce(CoReduceFunction)} can be applied for each window.
- *
+ * 
  * @param <IN1>
  *            The type of the first input data stream
  * @param <IN2>
  *            The type of the second input data stream
  */
 public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, 
IN2> {
-       TimeStamp<IN1> timeStamp1;
-       TimeStamp<IN2> timeStamp2;
+       TimestampWrapper<IN1> timeStamp1;
+       TimestampWrapper<IN2> timeStamp2;
 
        protected CoWindowDataStream(DataStream<IN1> dataStream1, 
DataStream<IN2> dataStream2,
                        long windowSize1, long windowSize2, long 
slideInterval1, long slideInterval2,
-                       TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
+                       TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> 
timeStamp2) {
                super(dataStream1, dataStream2, windowSize1, windowSize2, 
slideInterval1, slideInterval2);
                this.timeStamp1 = timeStamp1;
                this.timeStamp2 = timeStamp2;
        }
 
        protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> 
coDataStream, long windowSize1,
-                       long windowSize2, long slideInterval1, long 
slideInterval2, TimeStamp<IN1> timeStamp1,
-                       TimeStamp<IN2> timeStamp2) {
+                       long windowSize2, long slideInterval1, long 
slideInterval2,
+                       TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> 
timeStamp2) {
                super(coDataStream, windowSize1, windowSize2, slideInterval1, 
slideInterval2);
                this.timeStamp1 = timeStamp1;
                this.timeStamp2 = timeStamp2;
@@ -96,9 +96,9 @@ public class CoWindowDataStream<IN1, IN2> extends 
CoBatchedDataStream<IN1, IN2>
                        CoReduceFunction<IN1, IN2, OUT> coReducer) {
                CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
                if (isGrouped) {
-                       invokable = new CoGroupedWindowReduceInvokable<IN1, 
IN2, OUT>(clean(coReducer), batchSize1,
-                                       batchSize2, slideSize1, slideSize2, 
keySelector1, keySelector2, timeStamp1,
-                                       timeStamp2);
+                       invokable = new CoGroupedWindowReduceInvokable<IN1, 
IN2, OUT>(clean(coReducer),
+                                       batchSize1, batchSize2, slideSize1, 
slideSize2, keySelector1, keySelector2,
+                                       timeStamp1, timeStamp2);
                } else {
                        invokable = new CoWindowReduceInvokable<IN1, IN2, 
OUT>(clean(coReducer), batchSize1,
                                        batchSize2, slideSize1, slideSize2, 
timeStamp1, timeStamp2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 39b6460..efd9531 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -36,8 +36,8 @@ import 
org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * The ConnectedDataStream represents a stream for two different data types. It
@@ -305,8 +305,8 @@ public class ConnectedDataStream<IN1, IN2> {
         * @return The transformed {@link ConnectedDataStream}
         */
        public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2,
-                       long slideInterval1, long slideInterval2, 
TimeStamp<IN1> timeStamp1,
-                       TimeStamp<IN2> timeStamp2) {
+                       long slideInterval1, long slideInterval2, 
TimestampWrapper<IN1> timeStamp1,
+                       TimestampWrapper<IN2> timeStamp2) {
                if (windowSize1 < 1 || windowSize2 < 1) {
                        throw new IllegalArgumentException("Window size must be 
positive");
                }
@@ -338,10 +338,12 @@ public class ConnectedDataStream<IN1, IN2> {
         *            second input data stream are slid by after each 
transformation
         * @return The transformed {@link ConnectedDataStream}
         */
+       @SuppressWarnings("unchecked")
        public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2,
                        long slideInterval1, long slideInterval2) {
                return window(windowSize1, windowSize2, slideInterval1, 
slideInterval2,
-                               new DefaultTimeStamp<IN1>(), new 
DefaultTimeStamp<IN2>());
+                               (TimestampWrapper<IN1>) 
SystemTimestamp.getWrapper(),
+                               (TimestampWrapper<IN2>) 
SystemTimestamp.getWrapper());
        }
 
        /**
@@ -365,7 +367,7 @@ public class ConnectedDataStream<IN1, IN2> {
         * @return The transformed {@link ConnectedDataStream}
         */
        public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2,
-                       TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
+                       TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> 
timeStamp2) {
                return window(windowSize1, windowSize2, windowSize1, 
windowSize2, timeStamp1, timeStamp2);
        }
 
@@ -384,9 +386,11 @@ public class ConnectedDataStream<IN1, IN2> {
         *            milliseconds
         * @return The transformed {@link ConnectedDataStream}
         */
+       @SuppressWarnings("unchecked")
        public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2) {
                return window(windowSize1, windowSize2, windowSize1, 
windowSize2,
-                               new DefaultTimeStamp<IN1>(), new 
DefaultTimeStamp<IN2>());
+                               (TimestampWrapper<IN1>) 
SystemTimestamp.getWrapper(),
+                               (TimestampWrapper<IN2>) 
SystemTimestamp.getWrapper());
        }
 
        /**
@@ -479,10 +483,12 @@ public class ConnectedDataStream<IN1, IN2> {
         * 
         * @return The transformed {@link DataStream}.
         */
+       @SuppressWarnings("unchecked")
        public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
                        CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long 
windowSize, long slideInterval) {
                return windowReduce(coWindowFunction, windowSize, slideInterval,
-                               new DefaultTimeStamp<IN1>(), new 
DefaultTimeStamp<IN2>());
+                               (TimestampWrapper<IN1>) 
SystemTimestamp.getWrapper(),
+                               (TimestampWrapper<IN2>) 
SystemTimestamp.getWrapper());
        }
 
        /**
@@ -510,7 +516,7 @@ public class ConnectedDataStream<IN1, IN2> {
         */
        public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
                        CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long 
windowSize, long slideInterval,
-                       TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+                       TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> 
timestamp2) {
 
                if (windowSize < 1) {
                        throw new IllegalArgumentException("Window size must be 
positive");
@@ -541,8 +547,8 @@ public class ConnectedDataStream<IN1, IN2> {
 
        public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
                        CoWindowFunction<IN1, IN2, OUT> coWindowFunction, 
TypeInformation<OUT> outTypeInfo,
-                       long windowSize, long slideInterval, TimeStamp<IN1> 
timestamp1,
-                       TimeStamp<IN2> timestamp2) {
+                       long windowSize, long slideInterval, 
TimestampWrapper<IN1> timestamp1,
+                       TimestampWrapper<IN2> timestamp2) {
 
                if (windowSize < 1) {
                        throw new IllegalArgumentException("Window size must be 
positive");
@@ -550,7 +556,7 @@ public class ConnectedDataStream<IN1, IN2> {
                if (slideInterval < 1) {
                        throw new IllegalArgumentException("Slide interval must 
be positive");
                }
-               
+
                return addCoFunction("coWindowReduce", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
                                clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
index cd8aabd..e5385f0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 public abstract class TemporalOperator<I1, I2, OP> {
 
@@ -29,8 +29,8 @@ public abstract class TemporalOperator<I1, I2, OP> {
        public long windowSize;
        public long slideInterval;
 
-       public TimeStamp<I1> timeStamp1;
-       public TimeStamp<I2> timeStamp2;
+       public TimestampWrapper<I1> timeStamp1;
+       public TimestampWrapper<I2> timeStamp2;
 
        public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
                if (input1 == null || input2 == null) {
@@ -41,37 +41,37 @@ public abstract class TemporalOperator<I1, I2, OP> {
        }
 
        /**
-        * Continues a temporal Join transformation.<br/>
-        * Defines the window size on which the two DataStreams will be joined.
+        * Continues a temporal transformation.<br/>
+        * Defines the window size on which the two DataStreams will be 
transformed.
         * 
         * @param windowSize
         *            The size of the window in milliseconds.
-        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
-        *         to continue the Join.
+        * @return An incomplete temporal transformation.
         */
        public OP onWindow(long windowSize) {
                return onWindow(windowSize, windowSize);
        }
 
        /**
-        * Continues a temporal Join transformation.<br/>
-        * Defines the window size on which the two DataStreams will be joined.
+        * Continues a temporal transformation.<br/>
+        * Defines the window size on which the two DataStreams will be 
transformed.
         * 
         * @param windowSize
         *            The size of the window in milliseconds.
         * @param slideInterval
         *            The slide size of the window.
-        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
-        *         to continue the Join.
+        * @return An incomplete temporal transformation.
         */
+       @SuppressWarnings("unchecked")
        public OP onWindow(long windowSize, long slideInterval) {
-               return onWindow(windowSize, slideInterval, new 
DefaultTimeStamp<I1>(),
-                               new DefaultTimeStamp<I2>());
+               return onWindow(windowSize, slideInterval,
+                               (TimestampWrapper<I1>) 
SystemTimestamp.getWrapper(),
+                               (TimestampWrapper<I2>) 
SystemTimestamp.getWrapper());
        }
 
        /**
-        * Continues a temporal Join transformation.<br/>
-        * Defines the window size on which the two DataStreams will be joined.
+        * Continues a temporal transformation.<br/>
+        * Defines the window size on which the two DataStreams will be 
transformed.
         * 
         * @param windowSize
         *            The size of the window in milliseconds.
@@ -83,11 +83,10 @@ public abstract class TemporalOperator<I1, I2, OP> {
         * @param timeStamp2
         *            The timestamp used to extract time from the elements of 
the
         *            second data stream.
-        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
-        *         to continue the Join.
+        * @return An incomplete temporal transformation.
         */
-       public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> 
timeStamp1,
-                       TimeStamp<I2> timeStamp2) {
+       public OP onWindow(long windowSize, long slideInterval, 
TimestampWrapper<I1> timeStamp1,
+                       TimestampWrapper<I2> timeStamp2) {
 
                this.windowSize = windowSize;
                this.slideInterval = slideInterval;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
index 4905566..736239f 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
@@ -22,8 +22,8 @@ import java.util.Map;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
                CoWindowReduceInvokable<IN1, IN2, OUT> {
@@ -38,7 +38,7 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> 
extends
        public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> 
coReducer,
                        long windowSize1, long windowSize2, long 
slideInterval1, long slideInterval2,
                        KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> 
keySelector2,
-                       TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+                       TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> 
timestamp2) {
                super(coReducer, windowSize1, windowSize2, slideInterval1, 
slideInterval2, timestamp1,
                                timestamp2);
                this.keySelector1 = keySelector1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index 7df5668..59552f4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.state.CircularFifoList;
 
 public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, 
OUT> {
@@ -35,8 +35,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2, OUT>
        protected long slideSize;
        protected CircularFifoList<StreamRecord<IN1>> circularList1;
        protected CircularFifoList<StreamRecord<IN2>> circularList2;
-       protected TimeStamp<IN1> timeStamp1;
-       protected TimeStamp<IN2> timeStamp2;
+       protected TimestampWrapper<IN1> timeStamp1;
+       protected TimestampWrapper<IN2> timeStamp2;
 
        protected StreamWindow window;
 
@@ -44,7 +44,7 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2, OUT>
        protected long nextRecordTime;
 
        public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> 
coWindowFunction, long windowSize,
-                       long slideInterval, TimeStamp<IN1> timeStamp1, 
TimeStamp<IN2> timeStamp2) {
+                       long slideInterval, TimestampWrapper<IN1> timeStamp1, 
TimestampWrapper<IN2> timeStamp2) {
                super(coWindowFunction);
                this.coWindowFunction = coWindowFunction;
                this.windowSize = windowSize;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
index fa47761..0c8598f 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
@@ -19,8 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.co;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 public class CoWindowReduceInvokable<IN1, IN2, OUT> extends 
CoBatchReduceInvokable<IN1, IN2, OUT> {
        private static final long serialVersionUID = 1L;
@@ -28,14 +27,14 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends 
CoBatchReduceInvokab
        protected long startTime2;
        protected long nextRecordTime1;
        protected long nextRecordTime2;
-       protected TimeStamp<IN1> timestamp1;
-       protected TimeStamp<IN2> timestamp2;
+       protected TimestampWrapper<IN1> timestamp1;
+       protected TimestampWrapper<IN2> timestamp2;
        protected StreamWindow<IN1> window1;
        protected StreamWindow<IN2> window2;
 
        public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> 
coReducer, long windowSize1,
-                       long windowSize2, long slideInterval1, long 
slideInterval2, TimeStamp<IN1> timestamp1,
-                       TimeStamp<IN2> timestamp2) {
+                       long windowSize2, long slideInterval1, long 
slideInterval2,
+                       TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> 
timestamp2) {
                super(coReducer, windowSize1, windowSize2, slideInterval1, 
slideInterval2);
                this.timestamp1 = timestamp1;
                this.timestamp2 = timestamp2;
@@ -51,10 +50,10 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends 
CoBatchReduceInvokab
                this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2);
                this.batch1 = this.window1;
                this.batch2 = this.window2;
-               if (timestamp1 instanceof DefaultTimeStamp) {
+               if (timestamp1.isDefaultTimestamp()) {
                        (new TimeCheck1()).start();
                }
-               if (timestamp2 instanceof DefaultTimeStamp) {
+               if (timestamp2.isDefaultTimestamp()) {
                        (new TimeCheck2()).start();
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
deleted file mode 100644
index 2f22e8e..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-/**
- * Default timestamp function that uses the Java System.currentTimeMillis()
- * method to retrieve a timestamp.
- *
- * @param <T>
- *            Type of the inputs of the reducing function.
- */
-public class DefaultTimeStamp<T> implements TimeStamp<T> {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public long getTimestamp(T value) {
-               return System.currentTimeMillis();
-       }
-
-       @Override
-       public long getStartTime() {
-               return System.currentTimeMillis();
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
deleted file mode 100644
index 86fa101..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- *            Type of the value to create the timestamp from.
- */
-public interface TimeStamp<T> extends Serializable {
-
-       /**
-        * Values
-        * 
-        * @param value
-        *            The value to create the timestamp from
-        * @return The timestamp
-        */
-       public long getTimestamp(T value);
-
-       /**
-        * Function to define the starting time for reference
-        * 
-        * @return The starting timestamp
-        */
-       public long getStartTime();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 9c8c5ca..5434a4e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -84,8 +84,8 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
         *            The threshold used by the delta function.
         * @return Helper representing a delta trigger or eviction policy
         */
-       public static <DATA> Delta<DATA> of(DeltaFunction<DATA> deltaFunction, 
DATA initVal,
-                       double threshold) {
+       public static <DATA> Delta<DATA> of(double threshold, 
DeltaFunction<DATA> deltaFunction,
+                       DATA initVal) {
                return new Delta<DATA>(deltaFunction, initVal, threshold);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
new file mode 100644
index 0000000..8581ac5
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+/**
+ * {@link Timestamp} implementation to be used when system time is needed to
+ * determine windows
+ */
+public class SystemTimestamp<T> implements Timestamp<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public long getTimestamp(T value) {
+               return System.currentTimeMillis();
+       }
+
+       public static <R> TimestampWrapper<R> getWrapper() {
+               return new TimestampWrapper<R>(new SystemTimestamp<R>(), 
System.currentTimeMillis());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index d987e32..9dc1c8c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.api.windowing.helper;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
@@ -39,7 +37,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
        private long length;
        private TimeUnit granularity;
-       private TimeStamp<DATA> timeStamp;
+       private TimestampWrapper<DATA> timestampWrapper;
        private long delay;
 
        /**
@@ -53,31 +51,52 @@ public class Time<DATA> implements WindowingHelper<DATA> {
         *            the smallest possible granularity is milliseconds. Any 
smaller
         *            time unit might cause an error at runtime due to 
conversion
         *            problems.
-        * @param timeStamp
+        * @param timestamp
         *            The user defined timestamp that will be used to extract 
time
         *            information from the incoming elements
+        * @param startTime
+        *            The startTime of the stream for computing the first window
         */
-       private Time(long length, TimeUnit timeUnit, TimeStamp<DATA> timeStamp) 
{
+       private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, 
long startTime) {
+               this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, 
startTime));
+       }
+
+       /**
+        * Creates an helper representing a trigger which triggers every given
+        * length or an eviction which evicts all elements older than length.
+        * 
+        * @param length
+        *            The number of time units
+        * @param timeUnit
+        *            The unit of time such as minute oder millisecond. Note 
that
+        *            the smallest possible granularity is milliseconds. Any 
smaller
+        *            time unit might cause an error at runtime due to 
conversion
+        *            problems.
+        * @param timestampWrapper
+        *            The user defined {@link TimestampWrapper} that will be 
used to
+        *            extract time information from the incoming elements
+        */
+       private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> 
timestampWrapper) {
                this.length = length;
                this.granularity = timeUnit;
-               this.timeStamp = timeStamp;
+               this.timestampWrapper = timestampWrapper;
                this.delay = 0;
        }
 
        @Override
        public EvictionPolicy<DATA> toEvict() {
-               return new TimeEvictionPolicy<DATA>(granularityInMillis(), 
timeStamp);
+               return new TimeEvictionPolicy<DATA>(granularityInMillis(), 
timestampWrapper);
        }
 
        @Override
        public TriggerPolicy<DATA> toTrigger() {
-               return new TimeTriggerPolicy<DATA>(granularityInMillis(), 
timeStamp, delay);
+               return new TimeTriggerPolicy<DATA>(granularityInMillis(), 
timestampWrapper, delay);
        }
 
        /**
         * Creates a helper representing a time trigger which triggers every 
given
         * length (slide size) or a time eviction which evicts all elements 
older
-        * than length (window size).
+        * than length (window size) using System time.
         * 
         * @param length
         *            The number of time units
@@ -88,8 +107,10 @@ public class Time<DATA> implements WindowingHelper<DATA> {
         *            problems.
         * @return Helper representing the time based trigger and eviction 
policy
         */
+       @SuppressWarnings("unchecked")
        public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
-               return new Time<DATA>(length, timeUnit, new 
DefaultTimeStamp<DATA>());
+               return new Time<DATA>(length, timeUnit,
+                               (TimestampWrapper<DATA>) 
SystemTimestamp.getWrapper());
        }
 
        /**
@@ -99,13 +120,32 @@ public class Time<DATA> implements WindowingHelper<DATA> {
         * 
         * @param length
         *            The number of time units
-        * @param timeStamp
+        * @param timestamp
+        *            The user defined timestamp that will be used to extract 
time
+        *            information from the incoming elements
+        * @param startTime
+        *            The startTime used to compute the first window
+        * @return Helper representing the time based trigger and eviction 
policy
+        */
+       public static <DATA> Time<DATA> of(long length, Timestamp<DATA> 
timestamp, long startTime) {
+               return new Time<DATA>(length, null, timestamp, startTime);
+       }
+
+       /**
+        * Creates a helper representing a time trigger which triggers every 
given
+        * length (slide size) or a time eviction which evicts all elements 
older
+        * than length (window size) using a user defined timestamp extractor. 
By
+        * default the start time is set to 0.
+        * 
+        * @param length
+        *            The number of time units
+        * @param timestamp
         *            The user defined timestamp that will be used to extract 
time
         *            information from the incoming elements
         * @return Helper representing the time based trigger and eviction 
policy
         */
-       public static <DATA> Time<DATA> of(long length, TimeStamp<DATA> 
timeStamp) {
-               return new Time<DATA>(length, TimeUnit.MILLISECONDS, timeStamp);
+       public static <DATA> Time<DATA> of(long length, Timestamp<DATA> 
timestamp) {
+               return of(length, timestamp, 0);
        }
 
        /**
@@ -121,6 +161,10 @@ public class Time<DATA> implements WindowingHelper<DATA> {
        }
 
        private long granularityInMillis() {
-               return this.granularity.toMillis(this.length);
+               if (granularity != null) {
+                       return this.granularity.toMillis(this.length);
+               } else {
+                       return this.length;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
new file mode 100644
index 0000000..fea6020
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ *            Type of the value to create the timestamp from.
+ */
+public interface Timestamp<T> extends Serializable {
+
+       /**
+        * Values
+        * 
+        * @param value
+        *            The value to create the timestamp from
+        * @return The timestamp
+        */
+       public long getTimestamp(T value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
new file mode 100644
index 0000000..8c3a09d
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+public class TimestampWrapper<T> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+       private long startTime;
+       private Timestamp<T> timestamp;
+
+       public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
+               this.timestamp = timeStamp;
+               this.startTime = startTime;
+       }
+
+       public long getTimestamp(T in) {
+               return timestamp.getTimestamp(in);
+       }
+
+       public long getStartTime() {
+               return startTime;
+       }
+
+       public boolean isDefaultTimestamp() {
+               return timestamp instanceof SystemTimestamp;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
index a8a704d..414250c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 
 /**
  * This interface extends the {@link TriggerPolicy} interface with 
functionality
@@ -28,7 +28,7 @@ import 
org.apache.flink.streaming.api.invokable.util.TimeStamp;
  * first. It can return zero ore more fake data points which will be added
  * before the the currently arrived real element gets processed. This allows to
  * handle empty windows in time based windowing with an user defined
- * {@link TimeStamp}. Triggers are not called on fake datapoint. A fake
+ * {@link Timestamp}. Triggers are not called on fake datapoint. A fake
  * datapoint is always considered as triggered.
  * 
  * 2) An active trigger has a factory method for a runnable. This factory 
method
@@ -49,7 +49,7 @@ public interface ActiveTriggerPolicy<DATA> extends 
TriggerPolicy<DATA> {
         * first. It can return zero ore more fake data points which will be 
added
         * before the the currently arrived real element gets processed. This 
allows
         * to handle empty windows in time based windowing with an user defined
-        * {@link TimeStamp}. Triggers are not called on fake datapoints. A fake
+        * {@link Timestamp}. Triggers are not called on fake datapoints. A fake
         * datapoint is always considered as triggered.
         * 
         * @param datapoint

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index aca1dee..16c30fc 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -19,14 +19,15 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * This eviction policy evicts all elements which are older then a specified
- * time. The time is measured using a given {@link TimeStamp} implementation. A
+ * time. The time is measured using a given {@link Timestamp} implementation. A
  * point in time is always represented as long. Therefore, the granularity can
  * be set as long value as well.
- *
+ * 
  * @param <DATA>
  *            The type of the incoming data points which are processed by this
  *            policy.
@@ -40,12 +41,12 @@ public class TimeEvictionPolicy<DATA> implements 
ActiveEvictionPolicy<DATA>,
        private static final long serialVersionUID = -1457476766124518220L;
 
        private long granularity;
-       private TimeStamp<DATA> timestamp;
+       private TimestampWrapper<DATA> timestampWrapper;
        private LinkedList<Long> buffer = new LinkedList<Long>();
 
        /**
         * This eviction policy evicts all elements which are older than a 
specified
-        * time. The time is measured using a given {@link TimeStamp}
+        * time. The time is measured using a given {@link Timestamp}
         * implementation. A point in time is always represented as long. 
Therefore,
         * the granularity can be set as long value as well. If this value is 
set to
         * 2 the policy will evict all elements which are older as 2.
@@ -60,12 +61,12 @@ public class TimeEvictionPolicy<DATA> implements 
ActiveEvictionPolicy<DATA>,
         *            The granularity of the eviction. If this value is set to 
2 the
         *            policy will evict all elements which are older as 2(if
         *            (time(X)<current time-granularity) evict X).
-        * @param timestamp
-        *            The {@link TimeStamp} to measure the time with. This can 
be
-        *            either user defined of provided by the API.
+        * @param timestampWrapper
+        *            The {@link TimestampWrapper} to measure the time with. 
This
+        *            can be either user defined of provided by the API.
         */
-       public TimeEvictionPolicy(long granularity, TimeStamp<DATA> timestamp) {
-               this.timestamp = timestamp;
+       public TimeEvictionPolicy(long granularity, TimestampWrapper<DATA> 
timestampWrapper) {
+               this.timestampWrapper = timestampWrapper;
                this.granularity = granularity;
        }
 
@@ -78,7 +79,7 @@ public class TimeEvictionPolicy<DATA> implements 
ActiveEvictionPolicy<DATA>,
                try {
                        threshold = (Long) datapoint - granularity;
                } catch (ClassCastException e) {
-                       threshold = timestamp.getTimestamp((DATA) datapoint) - 
granularity;
+                       threshold = timestampWrapper.getTimestamp((DATA) 
datapoint) - granularity;
                }
 
                // return result
@@ -91,9 +92,9 @@ public class TimeEvictionPolicy<DATA> implements 
ActiveEvictionPolicy<DATA>,
 
                checkForDeleted(bufferSize);
 
-               //remember timestamp
-               long time=timestamp.getTimestamp(datapoint);
-               
+               // remember timestamp
+               long time = timestampWrapper.getTimestamp(datapoint);
+
                // delete and count expired tuples
                long threshold = time - granularity;
                int counter = deleteAndCountExpired(threshold);
@@ -130,7 +131,7 @@ public class TimeEvictionPolicy<DATA> implements 
ActiveEvictionPolicy<DATA>,
 
        @Override
        public TimeEvictionPolicy<DATA> clone() {
-               return new TimeEvictionPolicy<DATA>(granularity, timestamp);
+               return new TimeEvictionPolicy<DATA>(granularity, 
timestampWrapper);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 57bccf2..1e91b8e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * This trigger policy triggers with regard to the time. The is measured using 
a
- * given {@link TimeStamp} implementation. A point in time is always 
represented
+ * given {@link Timestamp} implementation. A point in time is always 
represented
  * as long. Therefore, parameters such as granularity and delay can be set as
  * long value as well.
  * 
@@ -42,12 +42,12 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
 
        protected long startTime;
        protected long granularity;
-       protected TimeStamp<DATA> timestamp;
+       protected TimestampWrapper<DATA> timestampWrapper;
        protected long delay;
 
        /**
         * This trigger policy triggers with regard to the time. The is measured
-        * using a given {@link TimeStamp} implementation. A point in time is 
always
+        * using a given {@link Timestamp} implementation. A point in time is 
always
         * represented as long. Therefore, parameters such as granularity can 
be set
         * as long value as well. If this value for the granularity is set to 2 
for
         * example, the policy will trigger at every second point in time.
@@ -55,22 +55,22 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
         * @param granularity
         *            The granularity of the trigger. If this value is set to x 
the
         *            policy will trigger at every x-th time point
-        * @param timestamp
-        *            The {@link TimeStamp} to measure the time with. This can 
be
-        *            either user defined of provided by the API.
+        * @param timestampWrapper
+        *            The {@link TimestampWrapper} to measure the time with. 
This
+        *            can be either user defined of provided by the API.
         * @param timeWrapper
         *            This policy creates fake elements to not miss windows in 
case
         *            no element arrived within the duration of the window. This
         *            extractor should wrap a long into such an element of type
         *            DATA.
         */
-       public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp) {
-               this(granularity, timestamp, 0);
+       public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> 
timestampWrapper) {
+               this(granularity, timestampWrapper, 0);
        }
 
        /**
         * This is mostly the same as
-        * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In 
addition
+        * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In 
addition
         * to granularity and timestamp a delay can be specified for the first
         * trigger. If the start time given by the timestamp is x, the delay is 
y,
         * and the granularity is z, the first trigger will happen at x+y+z.
@@ -78,9 +78,9 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
         * @param granularity
         *            The granularity of the trigger. If this value is set to 2 
the
         *            policy will trigger at every second time point
-        * @param timestamp
-        *            The {@link TimeStamp} to measure the time with. This can 
be
-        *            either user defined of provided by the API.
+        * @param timestampWrapper
+        *            The {@link TimestampWrapper} to measure the time with. 
This
+        *            can be either user defined of provided by the API.
         * @param delay
         *            A delay for the first trigger. If the start time given by 
the
         *            timestamp is x, the delay is y, and the granularity is z, 
the
@@ -91,9 +91,9 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
         *            extractor should wrap a long into such an element of type
         *            DATA.
         */
-       public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, 
long delay) {
-               this.startTime = timestamp.getStartTime() + delay;
-               this.timestamp = timestamp;
+       public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> 
timestampWrapper, long delay) {
+               this.startTime = timestampWrapper.getStartTime() + delay;
+               this.timestampWrapper = timestampWrapper;
                this.granularity = granularity;
                this.delay = delay;
        }
@@ -107,7 +107,7 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
                LinkedList<Object> fakeElements = new LinkedList<Object>();
                // check if there is more then one window border missed
                // use > here. In case >= would fit, the regular call will do 
the job.
-               while (timestamp.getTimestamp(datapoint) >= startTime + 
granularity) {
+               while (timestampWrapper.getTimestamp(datapoint) >= startTime + 
granularity) {
                        startTime += granularity;
                        fakeElements.add(startTime - 1);
                }
@@ -127,7 +127,7 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
         */
        @Override
        public Runnable createActiveTriggerRunnable(ActiveTriggerCallback 
callback) {
-               if (this.timestamp instanceof DefaultTimeStamp) {
+               if (this.timestampWrapper.isDefaultTimestamp()) {
                        return new TimeCheck(callback);
                } else {
                        return null;
@@ -177,7 +177,7 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
 
        @Override
        public synchronized boolean notifyTrigger(DATA datapoint) {
-               long recordTime = timestamp.getTimestamp(datapoint);
+               long recordTime = timestampWrapper.getTimestamp(datapoint);
                if (recordTime >= startTime + granularity) {
                        if (granularity != 0) {
                                startTime = recordTime - ((recordTime - 
startTime) % granularity);
@@ -190,7 +190,7 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
 
        @Override
        public TimeTriggerPolicy<DATA> clone() {
-               return new TimeTriggerPolicy<DATA>(granularity, timestamp, 
delay);
+               return new TimeTriggerPolicy<DATA>(granularity, 
timestampWrapper, delay);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index cd6232a..e856f07 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
@@ -96,12 +96,16 @@ public class WindowCrossJoinTest implements Serializable {
                DataStream<Tuple2<Integer, String>> inStream1 = 
env.fromCollection(in1);
                DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
 
-               inStream1.join(inStream2).onWindow(1000, 1000, new 
MyTimestamp1(), new MyTimestamp2())
-                               .where(0).equalTo(0).addSink(new 
JoinResultSink());
+               inStream1
+                               .join(inStream2)
+                               .onWindow(1000, 1000, new 
MyTimestamp<Tuple2<Integer, String>>(),
+                                               new 
MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0)
+                               .addSink(new JoinResultSink());
 
                inStream1
                                .cross(inStream2)
-                               .onWindow(1000, 1000, new MyTimestamp1(), new 
MyTimestamp2())
+                               .onWindow(1000, 1000, new 
MyTimestamp<Tuple2<Integer, String>>(),
+                                               new 
MyTimestamp<Tuple1<Integer>>())
                                .with(new CrossFunction<Tuple2<Integer, 
String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
 
                                        private static final long 
serialVersionUID = 1L;
@@ -119,25 +123,15 @@ public class WindowCrossJoinTest implements Serializable {
                assertEquals(crossExpectedResults, crossResults);
        }
 
-       private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, 
String>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public long getTimestamp(Tuple2<Integer, String> value) {
-                       return 101L;
+       private static class MyTimestamp<T> extends TimestampWrapper<T> {
+               public MyTimestamp() {
+                       super(null, 0);
                }
 
-               @Override
-               public long getStartTime() {
-                       return 100L;
-               }
-       }
-
-       private static class MyTimestamp2 implements TimeStamp<Tuple1<Integer>> 
{
                private static final long serialVersionUID = 1L;
 
                @Override
-               public long getTimestamp(Tuple1<Integer> value) {
+               public long getTimestamp(T value) {
                        return 101L;
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index e3f2a1b..508366c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
@@ -82,13 +82,14 @@ public class CoGroupedWindowReduceTest {
                }
        }
 
-       public static final class MyTimeStamp<T> implements TimeStamp<T> {
+       public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
                private static final long serialVersionUID = 1L;
 
                private Iterator<Long> timestamps;
                private long start;
 
                public MyTimeStamp(List<Long> timestamps) {
+                       super(null, 0);
                        this.timestamps = timestamps.iterator();
                        this.start = timestamps.get(0);
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 90ad483..035a021 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
@@ -57,13 +57,14 @@ public class CoWindowReduceTest {
                }
        }
 
-       public static final class MyTimeStamp<T> implements TimeStamp<T> {
+       public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
                private static final long serialVersionUID = 1L;
 
                private Iterator<Long> timestamps;
                private long start;
 
                public MyTimeStamp(List<Long> timestamps) {
+                       super(null, 0);
                        this.timestamps = timestamps.iterator();
                        this.start = timestamps.get(0);
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
index c6d446a..4ab3492 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
@@ -27,7 +27,8 @@ import java.util.Set;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -80,7 +81,7 @@ public class CoWindowTest {
 
        }
 
-       private static final class MyTS1 implements TimeStamp<Integer> {
+       private static final class MyTS1 implements Timestamp<Integer> {
 
                private static final long serialVersionUID = 1L;
 
@@ -89,14 +90,9 @@ public class CoWindowTest {
                        return value;
                }
 
-               @Override
-               public long getStartTime() {
-                       return 1;
-               }
-
        }
 
-       private static final class MyTS2 implements TimeStamp<Tuple2<Integer, 
Integer>> {
+       private static final class MyTS2 implements Timestamp<Tuple2<Integer, 
Integer>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -105,18 +101,14 @@ public class CoWindowTest {
                        return value.f0;
                }
 
-               @Override
-               public long getStartTime() {
-                       return 1;
-               }
-
        }
 
        @Test
        public void coWindowGroupReduceTest2() throws Exception {
 
                CoWindowInvokable<Integer, Integer, Integer> invokable1 = new 
CoWindowInvokable<Integer, Integer, Integer>(
-                               new MyCoGroup1(), 2, 1, new MyTS1(), new 
MyTS1());
+                               new MyCoGroup1(), 2, 1, new 
TimestampWrapper<Integer>(new MyTS1(), 1),
+                               new TimestampWrapper<Integer>(new MyTS1(), 1));
 
                // Windowsize 2, slide 1
                // 1,2|2,3|3,4|4,5
@@ -152,7 +144,8 @@ public class CoWindowTest {
                assertEquals(expected1, actual1);
 
                CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>, Integer>(
-                               new MyCoGroup2(), 2, 3, new MyTS2(), new 
MyTS2());
+                               new MyCoGroup2(), 2, 3, new 
TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
+                                               1), new 
TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
 
                // WindowSize 2, slide 3
                // 1,2|4,5|7,8|

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index d97cadc..f38d5c1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -28,7 +28,8 @@ import java.util.List;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import 
org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
@@ -360,20 +361,18 @@ public class GroupedWindowInvokableTest {
                expected.add(new Tuple2<Integer, String>(32, "b"));
                expected.add(new Tuple2<Integer, String>(32, "c"));
 
-               TimeStamp<Tuple2<Integer, String>> myTimeStamp = new 
TimeStamp<Tuple2<Integer, String>>() {
+               Timestamp<Tuple2<Integer, String>> myTimeStamp = new 
Timestamp<Tuple2<Integer, String>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
                        public long getTimestamp(Tuple2<Integer, String> value) 
{
                                return value.f0;
                        }
-
-                       @Override
-                       public long getStartTime() {
-                               return 1;
-                       }
                };
 
+               TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = 
new TimestampWrapper<Tuple2<Integer, String>>(
+                               myTimeStamp, 1);
+
                ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new 
ReduceFunction<Tuple2<Integer, String>>() {
                        private static final long serialVersionUID = 1L;
 
@@ -387,11 +386,11 @@ public class GroupedWindowInvokableTest {
                LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = 
new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
                // Trigger every 2 time units but delay the first trigger by 2 
(First
                // trigger after 4, then every 2)
-               triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, 
myTimeStamp, 2L));
+               triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, 
myTimeStampWrapper, 2L));
 
                LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> 
evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
                // Always delete all elements older then 4
-               evictions.add(new TimeEvictionPolicy<Tuple2<Integer, 
String>>(4L, myTimeStamp));
+               evictions.add(new TimeEvictionPolicy<Tuple2<Integer, 
String>>(4L, myTimeStampWrapper));
 
                LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> 
distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, 
String>>>();
 
@@ -409,10 +408,10 @@ public class GroupedWindowInvokableTest {
 
                // repeat the test with central eviction. The result should be 
the same.
                triggers.clear();
-               triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, 
myTimeStamp, 2L));
+               triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, 
myTimeStampWrapper, 2L));
                evictions.clear();
                LinkedList<EvictionPolicy<Tuple2<Integer, String>>> 
centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
-               centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, 
String>>(4L, myTimeStamp));
+               centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, 
String>>(4L, myTimeStampWrapper));
 
                invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, 
Tuple2<Integer, String>>(
                                myReduceFunction, keySelector, 
distributedTriggers, evictions, triggers,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
index 421a999..83b4596 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -24,7 +24,8 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -62,18 +63,13 @@ public class WindowInvokableTest {
                expected.add(10);
                expected.add(32);
 
-               TimeStamp<Integer> myTimeStamp = new TimeStamp<Integer>() {
+               Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
                        public long getTimestamp(Integer value) {
                                return value;
                        }
-
-                       @Override
-                       public long getStartTime() {
-                               return 1;
-                       }
                };
 
                ReduceFunction<Integer> myReduceFunction = new 
ReduceFunction<Integer>() {
@@ -88,11 +84,13 @@ public class WindowInvokableTest {
                LinkedList<TriggerPolicy<Integer>> triggers = new 
LinkedList<TriggerPolicy<Integer>>();
                // Trigger every 2 time units but delay the first trigger by 2 
(First
                // trigger after 4, then every 2)
-               triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 
2L));
+               triggers.add(new TimeTriggerPolicy<Integer>(2L, new 
TimestampWrapper<Integer>(myTimeStamp,
+                               1), 2L));
 
                LinkedList<EvictionPolicy<Integer>> evictions = new 
LinkedList<EvictionPolicy<Integer>>();
                // Always delete all elements older then 4
-               evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
+               evictions.add(new TimeEvictionPolicy<Integer>(4L, new 
TimestampWrapper<Integer>(
+                               myTimeStamp, 1)));
 
                WindowInvokable<Integer, Integer> invokable = new 
WindowReduceInvokable<Integer>(
                                myReduceFunction, triggers, evictions);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
index 82c8841..b5d502b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
@@ -19,7 +19,8 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -35,25 +36,20 @@ public class TimeEvictionPolicyTest {
 
                // create a timestamp
                @SuppressWarnings("serial")
-               TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+               Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
 
                        @Override
                        public long getTimestamp(Integer value) {
                                return value;
                        }
 
-                       @Override
-                       public long getStartTime() {
-                               return 0;
-                       }
-
                };
 
                // test different granularity
                for (long granularity = 0; granularity < 40; granularity++) {
                        // create policy
                        TimeEvictionPolicy<Integer> policy = new 
TimeEvictionPolicy<Integer>(granularity,
-                                       timeStamp);
+                                       new 
TimestampWrapper<Integer>(timeStamp, 0));
 
                        // The trigger status should not effect the policy. 
Therefore, it's
                        // value is changed after each usage.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 9c77a55..2bdbd96 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.junit.Test;
 
 public class TimeTriggerPolicyTest {
@@ -33,29 +34,24 @@ public class TimeTriggerPolicyTest {
 
                // create a timestamp
                @SuppressWarnings("serial")
-               TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+               Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
 
                        @Override
                        public long getTimestamp(Integer value) {
                                return value;
                        }
 
-                       @Override
-                       public long getStartTime() {
-                               return 0;
-                       }
-
                };
 
                // test different granularity
                for (long granularity = 0; granularity < 31; granularity++) {
                        // create policy
-                       TriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(granularity, timeStamp);
+
+                       TriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(granularity,
+                                       new 
TimestampWrapper<Integer>(timeStamp, 0));
 
                        // remember window border
-                       // Remark: This might NOT work in case the timeStamp 
uses
-                       // System.getCurrentTimeMillis to determine the start 
time.
-                       long currentTime = timeStamp.getStartTime();
+                       long currentTime = 0;
 
                        // test by adding values
                        for (int i = 0; i < times.length; i++) {
@@ -85,22 +81,18 @@ public class TimeTriggerPolicyTest {
 
                // create a timestamp
                @SuppressWarnings("serial")
-               TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+               Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
 
                        @Override
                        public long getTimestamp(Integer value) {
                                return value;
                        }
 
-                       @Override
-                       public long getStartTime() {
-                               return 0;
-                       }
-
                };
 
                // create policy
-               TimeTriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(5, timeStamp);
+               TimeTriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(5,
+                               new TimestampWrapper<Integer>(timeStamp, 0));
 
                // expected result
                Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
index 1013e6f..d6a9ac0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
@@ -49,10 +49,8 @@ public class DeltaExtractExample {
                @SuppressWarnings({ "unchecked", "rawtypes" })
                DataStream dstream = env
                                .addSource(new CountingSource())
-                               .window(Delta.of(new EuclideanDistance(new 
FieldsFromTuple(0, 1)), new Tuple3(0d,
-                                               0d, "foo"), 1.2))
-                               .every(Count.of(2))
-                               .reduce(new ConcatStrings());
+                               .window(Delta.of(1.2, new EuclideanDistance(new 
FieldsFromTuple(0, 1)), new Tuple3(
+                                               0d, 0d, 
"foo"))).every(Count.of(2)).reduce(new ConcatStrings());
 
                // emit result
                if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 38ad384..dc1e5b3 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.streaming
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
@@ -53,24 +54,6 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
-  /* This code is originally from the Apache Spark project. */
-  /**
-   * Clean a closure to make it ready to serialized and send to tasks
-   * (removes unreferenced variables in $outer's, updates REPL variables)
-   * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
-   * check to see if <tt>f</tt> is serializable and throw a 
<tt>SparkException</tt>
-   * if not.
-   *
-   * @param f the closure to clean
-   * @param checkSerializable whether or not to immediately check <tt>f</tt> 
for serializability
-   * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but 
<tt>f</tt> is not
-   *   serializable
-   */
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
   /**
    * Gets the underlying java DataStream object.
    */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
index 72052b9..e9010c8 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index 2489a64..a7a471f 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -161,9 +161,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): 
DataStream[T] = {
     Validate.notNull(function, "Function must not be null.")
-    ClosureCleaner.clean(function, true)
+    val cleanFun = StreamExecutionEnvironment.clean(function)
     val typeInfo = implicitly[TypeInformation[T]]
-    new DataStream[T](javaEnv.addSource(function, typeInfo))
+    new DataStream[T](javaEnv.addSource(cleanFun, typeInfo))
   }
   
    /**
@@ -174,8 +174,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): 
DataStream[T] = {
     Validate.notNull(function, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
+      val cleanFun = StreamExecutionEnvironment.clean(function)
       override def invoke(out: Collector[T]) {
-        function(out)
+        cleanFun(out)
       }
     }
     addSource(sourceFunction)
@@ -205,6 +206,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 }
 
 object StreamExecutionEnvironment {
+  
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
 
   /**
    * Creates an execution environment that represents the context in which the 
program is

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index f47d79e..4ed5082 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -33,6 +33,7 @@ import org.apache.commons.lang.Validate
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 
 class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fac77348/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index 5346c4c..e33368c 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -40,14 +40,10 @@ import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.function.aggregation.SumFunction
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 
 class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
   /**
    * Defines the slide size (trigger frequency) for the windowed data stream.
    * This controls how often the user defined function will be triggered on

Reply via email to