[FLINK-3209] Remove Unused ProcessingTime, EventTime and AbstractTime

Only keep Time for specifying time durations/intervals.

This closes #1512


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf75f424
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf75f424
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf75f424

Branch: refs/heads/master
Commit: cf75f424a2cc788cedbc0137f73d12e758a6c084
Parents: 4280e1f
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue Dec 8 11:17:08 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Jan 25 18:28:11 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  6 +-
 .../streaming/api/datastream/KeyedStream.java   |  6 +-
 .../windowing/assigners/SlidingTimeWindows.java |  4 +-
 .../assigners/TumblingTimeWindows.java          |  4 +-
 .../api/windowing/evictors/TimeEvictor.java     |  4 +-
 .../api/windowing/time/AbstractTime.java        | 98 --------------------
 .../streaming/api/windowing/time/EventTime.java | 62 -------------
 .../api/windowing/time/ProcessingTime.java      | 63 -------------
 .../streaming/api/windowing/time/Time.java      | 53 ++++++++---
 .../triggers/ContinuousEventTimeTrigger.java    |  4 +-
 .../ContinuousProcessingTimeTrigger.java        |  4 +-
 .../flink/streaming/api/scala/DataStream.scala  |  6 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  6 +-
 13 files changed, 60 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7ab3e2b..254af19 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -64,7 +64,7 @@ import 
org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -640,7 +640,7 @@ public class DataStream<T> {
         *
         * @param size The size of the window.
         */
-       public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime 
size) {
+       public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
                return windowAll(TumblingTimeWindows.of(size));
        }
 
@@ -660,7 +660,7 @@ public class DataStream<T> {
         *
         * @param size The size of the window.
         */
-       public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime 
size, AbstractTime slide) {
+       public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time 
slide) {
                return windowAll(SlidingTimeWindows.of(size, slide));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index d4a3a77..9b567f8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -162,7 +162,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *
         * @param size The size of the window.
         */
-       public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) 
{
+       public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
                return window(TumblingTimeWindows.of(size));
        }
 
@@ -177,7 +177,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *
         * @param size The size of the window.
         */
-       public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, 
AbstractTime slide) {
+       public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time 
slide) {
                return window(SlidingTimeWindows.of(size, slide));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 4077452..d517f6a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -98,7 +98,7 @@ public class SlidingTimeWindows extends 
WindowAssigner<Object, TimeWindow> {
         * @param slide The slide interval of the generated windows.
         * @return The time policy.
         */
-       public static SlidingTimeWindows of(AbstractTime size, AbstractTime 
slide) {
+       public static SlidingTimeWindows of(Time size, Time slide) {
                return new SlidingTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 7e5a11f..0efc940 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -83,7 +83,7 @@ public class TumblingTimeWindows extends 
WindowAssigner<Object, TimeWindow> {
         * @param size The size of the generated windows.
         * @return The time policy.
         */
-       public static TumblingTimeWindows of(AbstractTime size) {
+       public static TumblingTimeWindows of(Time size) {
                return new TumblingTimeWindows(size.toMilliseconds());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 5776d8d..49d7786 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.evictors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -67,7 +67,7 @@ public class TimeEvictor<W extends Window> implements 
Evictor<Object, W> {
         *
         * @param windowSize The amount of time for which to keep elements.
         */
-       public static <W extends Window> TimeEvictor<W> of(AbstractTime 
windowSize) {
+       public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
                return new TimeEvictor<>(windowSize.toMilliseconds());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
deleted file mode 100644
index 3f8fb60..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for {@link Time} implementations.
- */
-public abstract class AbstractTime {
-
-       /** The time unit for this policy's time interval */
-       private final TimeUnit unit;
-       
-       /** The size of the windows generated by this policy */
-       private final long size;
-
-
-       protected AbstractTime(long size, TimeUnit unit) {
-               this.unit = checkNotNull(unit, "time unit may not be null");
-               this.size = size;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the time unit for this policy's time interval.
-        * @return The time unit for this policy's time interval.
-        */
-       public TimeUnit getUnit() {
-               return unit;
-       }
-
-       /**
-        * Gets the length of this policy's time interval.
-        * @return The length of this policy's time interval.
-        */
-       public long getSize() {
-               return size;
-       }
-
-       /**
-        * Converts the time interval to milliseconds.
-        * @return The time interval in milliseconds.
-        */
-       public long toMilliseconds() {
-               return unit.toMillis(size);
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       public abstract AbstractTime 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
-
-       @Override
-       public int hashCode() {
-               return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj != null && obj.getClass() == getClass()) {
-                       AbstractTime that = (AbstractTime) obj;
-                       return this.size == that.size && 
this.unit.equals(that.unit);
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName() + " (" + size + ' ' + 
unit.name() + ')';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
deleted file mode 100644
index 6a4349c..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a 
definition
- * of event time.
- */
-public final class EventTime extends AbstractTime {
-
-       /** Instantiation only via factory method */
-       private EventTime(long size, TimeUnit unit) {
-               super(size, unit);
-       }
-
-       @Override
-       public EventTime 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-               if (characteristic == TimeCharacteristic.EventTime || 
characteristic == TimeCharacteristic.IngestionTime) {
-                       return this;
-               }
-               else {
-                       throw new InvalidProgramException(
-                                       "Cannot use EventTime policy in a 
dataflow that runs on " + characteristic);
-               }
-       }
-       // 
------------------------------------------------------------------------
-       //  Factory
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates an event time policy describing an event time interval.
-        *
-        * @param size The size of the generated windows.
-        * @param unit The init (seconds, milliseconds) of the time interval.
-        * @return The event time policy.
-        */
-       public static EventTime of(long size, TimeUnit unit) {
-               return new EventTime(size, unit);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
deleted file mode 100644
index 4be6ed0..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} 
for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTime {
-
-       /** Instantiation only via factory method */
-       private ProcessingTime(long size, TimeUnit unit) {
-               super(size, unit);
-       }
-
-       @Override
-       public ProcessingTime 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-               if (characteristic == TimeCharacteristic.ProcessingTime) {
-                       return this;
-               }
-               else {
-                       throw new InvalidProgramException(
-                                       "Cannot use ProcessingTime policy in a 
dataflow that runs on " + characteristic);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Factory
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a processing time policy describing a processing time 
interval.
-        *
-        * @param size The size of the generated windows.
-        * @param unit The init (seconds, milliseconds) of the time interval.
-        * @return The processing time policy.
-        */
-       public static ProcessingTime of(long size, TimeUnit unit) {
-               return new ProcessingTime(size, unit);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
index e0e9202..c30fdf4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -18,32 +18,55 @@
 
 package org.apache.flink.streaming.api.windowing.time;
 
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The definition of a time interval for windowing. The time characteristic 
referred
  * to is the default time characteristic set on the execution environment.
  */
-public final class Time extends AbstractTime {
+public final class Time {
+
+       /** The time unit for this policy's time interval */
+       private final TimeUnit unit;
+
+       /** The size of the windows generated by this policy */
+       private final long size;
 
        /** Instantiation only via factory method */
        private Time(long size, TimeUnit unit) {
-               super(size, unit);
+               this.unit = checkNotNull(unit, "time unit may not be null");
+               this.size = size;
+
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the time unit for this policy's time interval.
+        * @return The time unit for this policy's time interval.
+        */
+       public TimeUnit getUnit() {
+               return unit;
        }
 
-       @Override
-       public AbstractTime 
makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
-               switch (timeCharacteristic) {
-                       case ProcessingTime:
-                               return ProcessingTime.of(getSize(), getUnit());
-                       case IngestionTime:
-                       case EventTime:
-                               return EventTime.of(getSize(), getUnit());
-                       default:
-                               throw new IllegalArgumentException("Unknown 
time characteristic");
-               }
+       /**
+        * Gets the length of this policy's time interval.
+        * @return The length of this policy's time interval.
+        */
+       public long getSize() {
+               return size;
+       }
+
+       /**
+        * Converts the time interval to milliseconds.
+        * @return The time interval in milliseconds.
+        */
+       public long toMilliseconds() {
+               return unit.toMillis(size);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4b6af8f..0454e85 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -83,7 +83,7 @@ public class ContinuousEventTimeTrigger<W extends Window> 
implements Trigger<Obj
         * @param interval The time interval at which to fire.
         * @param <W> The type of {@link Window Windows} on which this trigger 
can operate.
         */
-       public static <W extends Window> ContinuousEventTimeTrigger<W> 
of(AbstractTime interval) {
+       public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time 
interval) {
                return new 
ContinuousEventTimeTrigger<>(interval.toMilliseconds());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 66f9bda..3576394 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -99,7 +99,7 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
         * @param interval The time interval at which to fire.
         * @param <W> The type of {@link Window Windows} on which this trigger 
can operate.
         */
-       public static <W extends Window> ContinuousProcessingTimeTrigger<W> 
of(AbstractTime interval) {
+       public static <W extends Window> ContinuousProcessingTimeTrigger<W> 
of(Time interval) {
                return new 
ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fcfbfe8..28edc2d 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -31,7 +31,7 @@ import 
org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWi
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, 
TimestampExtractor}
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, 
TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
@@ -532,7 +532,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @param size The size of the window.
    */
-  def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
+  def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = {
     val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, 
TimeWindow]]
     windowAll(assigner)
   }
@@ -551,7 +551,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @param size The size of the window.
    */
-  def timeWindowAll(size: AbstractTime, slide: AbstractTime): 
AllWindowedStream[T, TimeWindow] = {
+  def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] 
= {
     val assigner = SlidingTimeWindows.of(size, 
slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
     windowAll(assigner)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 9f5c069..59c5693 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -26,7 +26,7 @@ import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregato
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, 
TimeWindow, Window}
 import org.apache.flink.util.Collector
 
@@ -58,7 +58,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
    *
    * @param size The size of the window.
    */
-  def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
+  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
     val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, 
TimeWindow]]
     window(assigner)
   }
@@ -92,7 +92,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
    *
    * @param size The size of the window.
    */
-  def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, 
K, TimeWindow] = {
+  def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = {
     val assigner = SlidingTimeWindows.of(size, 
slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
     window(assigner)
   }

Reply via email to