Fix some typos and naming inconsistencies in new Windowing Code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a606c4a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a606c4a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a606c4a9 Branch: refs/heads/master Commit: a606c4a9e83e54397c88a70ea80a00e32ca61a93 Parents: 7e20299 Author: Aljoscha Krettek <[email protected]> Authored: Thu Sep 24 16:30:29 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:04:16 2015 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/TimeCharacteristic.java | 4 +-- .../api/datastream/KeyedWindowDataStream.java | 4 +-- .../environment/StreamExecutionEnvironment.java | 11 +++++---- .../windowpolicy/AbstractTimePolicy.java | 26 ++++++++++---------- .../api/windowing/windowpolicy/EventTime.java | 10 ++++---- .../windowing/windowpolicy/ProcessingTime.java | 12 ++++----- .../api/windowing/windowpolicy/Time.java | 14 +++++------ .../windowing/windowpolicy/WindowPolicy.java | 2 +- .../api/scala/StreamExecutionEnvironment.scala | 9 ++++--- 9 files changed, 47 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java index 1ad3c99..125ca65 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java @@ -42,7 +42,7 @@ public enum TimeCharacteristic { * elements based on that time, meaning that processing speed within the streaming dataflow * does not affect windowing, but only the speed at which sources receive elements. * <p> - * Ingestion time is often a good compromise between more processing time and event time. + * Ingestion time is often a good compromise between processing time and event time. * It does not need and special manual form of watermark generation, and events are typically * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can * only be introduced by streaming shuffles or split/join/union operations. The fact that elements @@ -69,7 +69,7 @@ public enum TimeCharacteristic { * with processing time, and typically also introduces more latency. The amount of extra * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span * between the arrival of early and late elements is. With respect to the "time watermarks", this - * means that teh cost typically depends on how early or late the watermarks for can be generated + * means that the cost typically depends on how early or late the watermarks can be generated * for their timestamp. * <p> * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java index 2ec175a..dfb7032 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator; /** * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and * for each key, the stream of elements is split into windows. The windows are conceptually - * evaluated for each key individually, meaning windows and trigger at different points + * evaluated for each key individually, meaning windows can trigger at different points * for each key. * <p> * In many cases, however, the windows are "aligned", meaning they trigger at the @@ -102,7 +102,7 @@ public class KeyedWindowDataStream<Type, Key> { * as a regular non-windowed stream. * <p> * Not that this function requires that all data in the windows is buffered until the window - * is evaluated, as the function provides no means od pre-aggregation. + * is evaluated, as the function provides no means of pre-aggregation. * * @param function The window function. * @return The data stream that is the result of applying the window function to the window. http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index a22a519..621d52a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -531,8 +531,8 @@ public abstract class StreamExecutionEnvironment { // -------------------------------------------------------------------------------------------- /** - * Sets the time characteristic for the stream, e.g., processing time, event time, - * or ingestion time. + * Sets the time characteristic for all streams create from this environment, e.g., processing + * time, event time, or ingestion time. * * @param characteristic The time characteristic. */ @@ -541,9 +541,10 @@ public abstract class StreamExecutionEnvironment { } /** - * Gets the time characteristic for the stream, e.g., processing time, event time, - * or ingestion time. - * + * Gets the time characteristic. + * + * @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic) + * * @return The time characteristic. */ public TimeCharacteristic getStreamTimeCharacteristic() { http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java index 9dc0dd0..6e382bd 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java @@ -26,16 +26,16 @@ public class AbstractTimePolicy extends WindowPolicy { private static final long serialVersionUID = 6593098375698927728L; - /** the time unit for this policy's time interval */ + /** The time unit for this policy's time interval */ private final TimeUnit unit; - /** the length of this policy's time interval */ - private final long num; + /** The size of the windows generated by this policy */ + private final long size; - protected AbstractTimePolicy(long num, TimeUnit unit) { + protected AbstractTimePolicy(long size, TimeUnit unit) { this.unit = checkNotNull(unit, "time unit may not be null"); - this.num = num; + this.size = size; } // ------------------------------------------------------------------------ @@ -54,8 +54,8 @@ public class AbstractTimePolicy extends WindowPolicy { * Gets the length of this policy's time interval. * @return The length of this policy's time interval. */ - public long getNum() { - return num; + public long getSize() { + return size; } /** @@ -63,7 +63,7 @@ public class AbstractTimePolicy extends WindowPolicy { * @return The time interval in milliseconds. */ public long toMilliseconds() { - return unit.toMillis(num); + return unit.toMillis(size); } // ------------------------------------------------------------------------ @@ -73,13 +73,13 @@ public class AbstractTimePolicy extends WindowPolicy { @Override public String toString(WindowPolicy slidePolicy) { if (slidePolicy == null) { - return "Tumbling Window (" + getClass().getSimpleName() + ") (" + num + ' ' + unit.name() + ')'; + return "Tumbling Window (" + getClass().getSimpleName() + ") (" + size + ' ' + unit.name() + ')'; } else if (slidePolicy.getClass() == getClass()) { AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy; return "Sliding Window (" + getClass().getSimpleName() + ") (length=" - + num + ' ' + unit.name() + ", slide=" + timeSlide.num + ' ' + timeSlide.unit.name() + ')'; + + size + ' ' + unit.name() + ", slide=" + timeSlide.size + ' ' + timeSlide.unit.name() + ')'; } else { return super.toString(slidePolicy); @@ -88,14 +88,14 @@ public class AbstractTimePolicy extends WindowPolicy { @Override public int hashCode() { - return 31 * (int) (num ^ (num >>> 32)) + unit.hashCode(); + return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode(); } @Override public boolean equals(Object obj) { if (obj != null && obj.getClass() == getClass()) { AbstractTimePolicy that = (AbstractTimePolicy) obj; - return this.num == that.num && this.unit.equals(that.unit); + return this.size == that.size && this.unit.equals(that.unit); } else { return false; @@ -104,6 +104,6 @@ public class AbstractTimePolicy extends WindowPolicy { @Override public String toString() { - return getClass().getSimpleName() + " (" + num + ' ' + unit.name() + ')'; + return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java index 8a671fc..c32a0b0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java @@ -33,8 +33,8 @@ public final class EventTime extends AbstractTimePolicy { private static final long serialVersionUID = 8333566691833596747L; /** Instantiation only via factory method */ - private EventTime(long num, TimeUnit unit) { - super(num, unit); + private EventTime(long size, TimeUnit unit) { + super(size, unit); } @Override @@ -54,11 +54,11 @@ public final class EventTime extends AbstractTimePolicy { /** * Creates an event time policy describing an event time interval. * - * @param num The length of the time interval. + * @param size The size of the generated windows. * @param unit The init (seconds, milliseconds) of the time interval. * @return The event time policy. */ - public static EventTime of(long num, TimeUnit unit) { - return new EventTime(num, unit); + public static EventTime of(long size, TimeUnit unit) { + return new EventTime(size, unit); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java index 2ff13fa..a71ba1d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java @@ -33,8 +33,8 @@ public final class ProcessingTime extends AbstractTimePolicy { private static final long serialVersionUID = 7546166721132583007L; /** Instantiation only via factory method */ - private ProcessingTime(long num, TimeUnit unit) { - super(num, unit); + private ProcessingTime(long size, TimeUnit unit) { + super(size, unit); } @Override @@ -54,12 +54,12 @@ public final class ProcessingTime extends AbstractTimePolicy { /** * Creates a processing time policy describing a processing time interval. - * - * @param num The length of the time interval. + * + * @param size The size of the generated windows. * @param unit The init (seconds, milliseconds) of the time interval. * @return The processing time policy. */ - public static ProcessingTime of(long num, TimeUnit unit) { - return new ProcessingTime(num, unit); + public static ProcessingTime of(long size, TimeUnit unit) { + return new ProcessingTime(size, unit); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java index 0233e96..efc9bf2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java @@ -31,18 +31,18 @@ public final class Time extends AbstractTimePolicy { private static final long serialVersionUID = 3197290738634320211L; /** Instantiation only via factory method */ - private Time(long num, TimeUnit unit) { - super(num, unit); + private Time(long size, TimeUnit unit) { + super(size, unit); } @Override public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) { switch (timeCharacteristic) { case ProcessingTime: - return ProcessingTime.of(getNum(), getUnit()); + return ProcessingTime.of(getSize(), getUnit()); case IngestionTime: case EventTime: - return EventTime.of(getNum(), getUnit()); + return EventTime.of(getSize(), getUnit()); default: throw new IllegalArgumentException("Unknown time characteristic"); } @@ -58,11 +58,11 @@ public final class Time extends AbstractTimePolicy { * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment# * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}. * - * @param num The length of the time interval. + * @param size The size of the generated windows. * @param unit The init (seconds, milliseconds) of the time interval. * @return The time policy. */ - public static Time of(long num, TimeUnit unit) { - return new Time(num, unit); + public static Time of(long size, TimeUnit unit) { + return new Time(size, unit); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java index a82f892..2e1a387 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java @@ -29,7 +29,7 @@ public abstract class WindowPolicy implements java.io.Serializable { private static final long serialVersionUID = -8696529489282723113L; /** - * If this window policies concrete instantiation depends on the time characteristic of the + * If the concrete instantiation of a window policy depends on the time characteristic of the * dataflow (processing time, event time), then this method must be overridden to convert this * policy to the respective specific instantiation. * <p> http://git-wip-us.apache.org/repos/asf/flink/blob/a606c4a9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 9d62bcb..e538435 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -299,8 +299,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { // Time characteristic // -------------------------------------------------------------------------------------------- /** - * Sets the time characteristic for the stream, e.g., processing time, event time, - * or ingestion time. + * Sets the time characteristic for all streams create from this environment, e.g., processing + * time, event time, or ingestion time. * * @param characteristic The time characteristic. */ @@ -309,8 +309,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } /** - * Gets the time characteristic for the stream, e.g., processing time, event time, - * or ingestion time. + * Gets the time characteristic/ + * + * @see #setStreamTimeCharacteristic * * @return The time characteristic. */
