[FLINK-3209] Remove Unused ProcessingTime, EventTime and AbstractTime Only keep Time for specifying time durations/intervals.
This closes #1512 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf75f424 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf75f424 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf75f424 Branch: refs/heads/master Commit: cf75f424a2cc788cedbc0137f73d12e758a6c084 Parents: 4280e1f Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Dec 8 11:17:08 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Jan 25 18:28:11 2016 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 6 +- .../streaming/api/datastream/KeyedStream.java | 6 +- .../windowing/assigners/SlidingTimeWindows.java | 4 +- .../assigners/TumblingTimeWindows.java | 4 +- .../api/windowing/evictors/TimeEvictor.java | 4 +- .../api/windowing/time/AbstractTime.java | 98 -------------------- .../streaming/api/windowing/time/EventTime.java | 62 ------------- .../api/windowing/time/ProcessingTime.java | 63 ------------- .../streaming/api/windowing/time/Time.java | 53 ++++++++--- .../triggers/ContinuousEventTimeTrigger.java | 4 +- .../ContinuousProcessingTimeTrigger.java | 4 +- .../flink/streaming/api/scala/DataStream.scala | 6 +- .../flink/streaming/api/scala/KeyedStream.scala | 6 +- 13 files changed, 60 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 7ab3e2b..254af19 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -64,7 +64,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; @@ -640,7 +640,7 @@ public class DataStream<T> { * * @param size The size of the window. */ - public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) { + public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) { return windowAll(TumblingTimeWindows.of(size)); } @@ -660,7 +660,7 @@ public class DataStream<T> { * * @param size The size of the window. */ - public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) { + public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) { return windowAll(SlidingTimeWindows.of(size, slide)); } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index d4a3a77..9b567f8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; @@ -162,7 +162,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * * @param size The size of the window. */ - public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) { + public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) { return window(TumblingTimeWindows.of(size)); } @@ -177,7 +177,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * * @param size The size of the window. */ - public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) { + public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { return window(SlidingTimeWindows.of(size, slide)); } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java index 4077452..d517f6a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; @@ -98,7 +98,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> { * @param slide The slide interval of the generated windows. * @return The time policy. */ - public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) { + public static SlidingTimeWindows of(Time size, Time slide) { return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java index 7e5a11f..0efc940 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; @@ -83,7 +83,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> { * @param size The size of the generated windows. * @return The time policy. */ - public static TumblingTimeWindows of(AbstractTime size) { + public static TumblingTimeWindows of(Time size) { return new TumblingTimeWindows(size.toMilliseconds()); } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java index 5776d8d..49d7786 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.evictors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -67,7 +67,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> { * * @param windowSize The amount of time for which to keep elements. */ - public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) { + public static <W extends Window> TimeEvictor<W> of(Time windowSize) { return new TimeEvictor<>(windowSize.toMilliseconds()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java deleted file mode 100644 index 3f8fb60..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.time; - -import org.apache.flink.streaming.api.TimeCharacteristic; - -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Base class for {@link Time} implementations. - */ -public abstract class AbstractTime { - - /** The time unit for this policy's time interval */ - private final TimeUnit unit; - - /** The size of the windows generated by this policy */ - private final long size; - - - protected AbstractTime(long size, TimeUnit unit) { - this.unit = checkNotNull(unit, "time unit may not be null"); - this.size = size; - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - /** - * Gets the time unit for this policy's time interval. - * @return The time unit for this policy's time interval. - */ - public TimeUnit getUnit() { - return unit; - } - - /** - * Gets the length of this policy's time interval. - * @return The length of this policy's time interval. - */ - public long getSize() { - return size; - } - - /** - * Converts the time interval to milliseconds. - * @return The time interval in milliseconds. - */ - public long toMilliseconds() { - return unit.toMillis(size); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic); - - @Override - public int hashCode() { - return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj.getClass() == getClass()) { - AbstractTime that = (AbstractTime) obj; - return this.size == that.size && this.unit.equals(that.unit); - } - else { - return false; - } - } - - @Override - public String toString() { - return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java deleted file mode 100644 index 6a4349c..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.time; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.streaming.api.TimeCharacteristic; - -import java.util.concurrent.TimeUnit; - -/** - * The definition of an event time interval for windowing. See - * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition - * of event time. - */ -public final class EventTime extends AbstractTime { - - /** Instantiation only via factory method */ - private EventTime(long size, TimeUnit unit) { - super(size, unit); - } - - @Override - public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) { - if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) { - return this; - } - else { - throw new InvalidProgramException( - "Cannot use EventTime policy in a dataflow that runs on " + characteristic); - } - } - // ------------------------------------------------------------------------ - // Factory - // ------------------------------------------------------------------------ - - /** - * Creates an event time policy describing an event time interval. - * - * @param size The size of the generated windows. - * @param unit The init (seconds, milliseconds) of the time interval. - * @return The event time policy. - */ - public static EventTime of(long size, TimeUnit unit) { - return new EventTime(size, unit); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java deleted file mode 100644 index 4be6ed0..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.time; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.streaming.api.TimeCharacteristic; - -import java.util.concurrent.TimeUnit; - -/** - * The definition of a processing time interval for windowing. See - * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition - * of processing time. - */ -public final class ProcessingTime extends AbstractTime { - - /** Instantiation only via factory method */ - private ProcessingTime(long size, TimeUnit unit) { - super(size, unit); - } - - @Override - public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) { - if (characteristic == TimeCharacteristic.ProcessingTime) { - return this; - } - else { - throw new InvalidProgramException( - "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic); - } - } - - // ------------------------------------------------------------------------ - // Factory - // ------------------------------------------------------------------------ - - /** - * Creates a processing time policy describing a processing time interval. - * - * @param size The size of the generated windows. - * @param unit The init (seconds, milliseconds) of the time interval. - * @return The processing time policy. - */ - public static ProcessingTime of(long size, TimeUnit unit) { - return new ProcessingTime(size, unit); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java index e0e9202..c30fdf4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java @@ -18,32 +18,55 @@ package org.apache.flink.streaming.api.windowing.time; -import org.apache.flink.streaming.api.TimeCharacteristic; - import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkNotNull; + /** * The definition of a time interval for windowing. The time characteristic referred * to is the default time characteristic set on the execution environment. */ -public final class Time extends AbstractTime { +public final class Time { + + /** The time unit for this policy's time interval */ + private final TimeUnit unit; + + /** The size of the windows generated by this policy */ + private final long size; /** Instantiation only via factory method */ private Time(long size, TimeUnit unit) { - super(size, unit); + this.unit = checkNotNull(unit, "time unit may not be null"); + this.size = size; + + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Gets the time unit for this policy's time interval. + * @return The time unit for this policy's time interval. + */ + public TimeUnit getUnit() { + return unit; } - @Override - public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) { - switch (timeCharacteristic) { - case ProcessingTime: - return ProcessingTime.of(getSize(), getUnit()); - case IngestionTime: - case EventTime: - return EventTime.of(getSize(), getUnit()); - default: - throw new IllegalArgumentException("Unknown time characteristic"); - } + /** + * Gets the length of this policy's time interval. + * @return The length of this policy's time interval. + */ + public long getSize() { + return size; + } + + /** + * Converts the time interval to milliseconds. + * @return The time interval in milliseconds. + */ + public long toMilliseconds() { + return unit.toMillis(size); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 4b6af8f..0454e85 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; /** @@ -83,7 +83,7 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj * @param interval The time interval at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ - public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime interval) { + public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) { return new ContinuousEventTimeTrigger<>(interval.toMilliseconds()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 66f9bda..3576394 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; /** @@ -99,7 +99,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge * @param interval The time interval at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ - public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) { + public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) { return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index fcfbfe8..28edc2d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWi import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor} import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.time.AbstractTime +import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} import org.apache.flink.streaming.util.serialization.SerializationSchema import org.apache.flink.util.Collector @@ -532,7 +532,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * * @param size The size of the window. */ - def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = { + def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = { val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]] windowAll(assigner) } @@ -551,7 +551,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * * @param size The size of the window. */ - def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = { + def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] = { val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]] windowAll(assigner) } http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 9f5c069..59c5693 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregato import org.apache.flink.streaming.api.operators.StreamGroupedReduce import org.apache.flink.streaming.api.scala.function.StatefulFunction import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.time.AbstractTime +import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} import org.apache.flink.util.Collector @@ -58,7 +58,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * * @param size The size of the window. */ - def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = { + def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = { val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]] window(assigner) } @@ -92,7 +92,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * * @param size The size of the window. */ - def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = { + def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = { val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]] window(assigner) }