Repository: kafka Updated Branches: refs/heads/trunk 3400d0c3c -> 79987590e
KAFKA-4671: Fix Streams window retention policy Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Damian Guy, Eno Thereska, Guozhang Wang Closes #2401 from mjsax/kafka-4671-window-retention-policy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79987590 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79987590 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79987590 Branch: refs/heads/trunk Commit: 79987590e3e96351ff75ce86718801ec605b2419 Parents: 3400d0c Author: Matthias J. Sax <matth...@confluent.io> Authored: Mon Jan 23 16:31:36 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Mon Jan 23 16:31:36 2017 -0800 ---------------------------------------------------------------------- .../kafka/streams/kstream/JoinWindows.java | 60 +++++---- .../kafka/streams/kstream/SessionWindows.java | 19 ++- .../kafka/streams/kstream/TimeWindows.java | 68 ++++++----- .../kafka/streams/kstream/UnlimitedWindows.java | 38 +++--- .../apache/kafka/streams/kstream/Window.java | 42 +++---- .../apache/kafka/streams/kstream/Windowed.java | 15 ++- .../apache/kafka/streams/kstream/Windows.java | 24 ++-- .../streams/kstream/internals/KStreamImpl.java | 12 +- .../kstream/internals/SessionWindow.java | 6 +- .../streams/kstream/internals/TimeWindow.java | 11 +- .../kstream/internals/UnlimitedWindow.java | 12 +- .../kafka/streams/kstream/JoinWindowsTest.java | 64 ++++++++-- .../streams/kstream/SessionWindowsTest.java | 68 +++++++++++ .../kafka/streams/kstream/TimeWindowsTest.java | 92 ++++++++++---- .../streams/kstream/UnlimitedWindowsTest.java | 22 +++- .../kafka/streams/kstream/WindowTest.java | 85 +++++++++++++ .../kafka/streams/kstream/WindowsTest.java | 62 ++++++++++ .../kstream/internals/TimeWindowTest.java | 122 +++++++++++++++++++ .../kstream/internals/UnlimitedWindowTest.java | 42 +++++++ .../WindowedStreamPartitionerTest.java | 2 +- .../CompositeReadOnlySessionStoreTest.java | 18 +-- 21 files changed, 701 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 9317743..6dd1a85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -17,8 +17,6 @@ package org.apache.kafka.streams.kstream; -import org.apache.kafka.streams.kstream.internals.TimeWindow; - import java.util.Map; /** @@ -45,21 +43,19 @@ import java.util.Map; * Both values (before and after) must not result in an "inverse" window, * i.e., lower-interval-bound must not be larger than upper-interval.bound. */ -public class JoinWindows extends Windows<TimeWindow> { +public class JoinWindows extends Windows<Window> { /** Maximum time difference for tuples that are before the join tuple. */ - public final long before; + public final long beforeMs; /** Maximum time difference for tuples that are after the join tuple. */ - public final long after; - - private JoinWindows(long before, long after) { - super(); + public final long afterMs; - if (before + after < 0) { - throw new IllegalArgumentException("Window interval (ie, before+after) must not be negative"); + private JoinWindows(final long beforeMs, final long afterMs) { + if (beforeMs + afterMs < 0) { + throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative"); } - this.after = after; - this.before = before; + this.afterMs = afterMs; + this.beforeMs = beforeMs; } /** @@ -68,8 +64,8 @@ public class JoinWindows extends Windows<TimeWindow> { * * @param timeDifference join window interval */ - public static JoinWindows of(long timeDifference) { - return new JoinWindows(timeDifference, timeDifference); + public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException { + return new JoinWindows(timeDifferenceMs, timeDifferenceMs); } /** @@ -79,8 +75,8 @@ public class JoinWindows extends Windows<TimeWindow> { * * @param timeDifference join window interval */ - public JoinWindows before(long timeDifference) { - return new JoinWindows(timeDifference, this.after); + public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException { + return new JoinWindows(timeDifferenceMs, afterMs); } /** @@ -90,25 +86,39 @@ public class JoinWindows extends Windows<TimeWindow> { * * @param timeDifference join window interval */ - public JoinWindows after(long timeDifference) { - return new JoinWindows(this.before, timeDifference); + public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException { + return new JoinWindows(beforeMs, timeDifferenceMs); } /** * Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}. */ @Override - public Map<Long, TimeWindow> windowsFor(long timestamp) { + public Map<Long, Window> windowsFor(final long timestamp) { throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows"); } @Override public long size() { - return after + before; + return beforeMs + afterMs; + } + + @Override + public JoinWindows until(final long durationMs) throws IllegalArgumentException { + if (durationMs < size()) { + throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size."); + } + super.until(durationMs); + return this; + } + + @Override + public long maintainMs() { + return Math.max(super.maintainMs(), size()); } @Override - public final boolean equals(Object o) { + public final boolean equals(final Object o) { if (o == this) { return true; } @@ -116,14 +126,14 @@ public class JoinWindows extends Windows<TimeWindow> { return false; } - JoinWindows other = (JoinWindows) o; - return this.before == other.before && this.after == other.after; + final JoinWindows other = (JoinWindows) o; + return beforeMs == other.beforeMs && afterMs == other.afterMs; } @Override public int hashCode() { - int result = (int) (before ^ (before >>> 32)); - result = 31 * result + (int) (after ^ (after >>> 32)); + int result = (int) (beforeMs ^ (beforeMs >>> 32)); + result = 31 * result + (int) (afterMs ^ (afterMs >>> 32)); return result; } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index f9a399a..bed6c3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -63,9 +63,9 @@ public class SessionWindows { private final long gapMs; private long maintainDurationMs; - private SessionWindows(final long gapMs, final long maintainDurationMs) { + private SessionWindows(final long gapMs) { this.gapMs = gapMs; - this.maintainDurationMs = maintainDurationMs; + maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS; } /** @@ -75,7 +75,10 @@ public class SessionWindows { * and default maintain duration */ public static SessionWindows with(final long inactivityGapMs) { - return new SessionWindows(inactivityGapMs, Windows.DEFAULT_MAINTAIN_DURATION); + if (inactivityGapMs < 1) { + throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative."); + } + return new SessionWindows(inactivityGapMs); } /** @@ -84,8 +87,12 @@ public class SessionWindows { * * @return itself */ - public SessionWindows until(final long durationMs) { - this.maintainDurationMs = durationMs; + public SessionWindows until(final long durationMs) throws IllegalArgumentException { + if (durationMs < gapMs) { + throw new IllegalArgumentException("Window retentin time (durationMs) cannot be smaller than window gap."); + } + maintainDurationMs = durationMs; + return this; } @@ -100,6 +107,6 @@ public class SessionWindows { * @return the minimum amount of time a window will be maintained for. */ public long maintainMs() { - return maintainDurationMs; + return Math.max(maintainDurationMs, gapMs); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index ef94cf9..11df228 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -40,27 +40,18 @@ public class TimeWindows extends Windows<TimeWindow> { * The window size's effective time unit is determined by the semantics of the topology's * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. */ - public final long size; + public final long sizeMs; /** * The size of the window's advance interval, i.e. by how much a window moves forward relative * to the previous one. The interval's effective time unit is determined by the semantics of * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. */ - public final long advance; + public final long advanceMs; - - private TimeWindows(long size, long advance) { - super(); - if (size <= 0) { - throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")"); - } - this.size = size; - if (!(0 < advance && advance <= size)) { - throw new IllegalArgumentException( - String.format("advance interval (%d) must lie within interval (0, %d]", advance, size)); - } - this.advance = advance; + private TimeWindows(final long sizeMs, final long advanceMs) throws IllegalArgumentException { + this.sizeMs = sizeMs; + this.advanceMs = advanceMs; } /** @@ -76,8 +67,11 @@ public class TimeWindows extends Windows<TimeWindow> { * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. * @return a new window definition */ - public static TimeWindows of(long size) { - return new TimeWindows(size, size); + public static TimeWindows of(final long sizeMs) throws IllegalArgumentException { + if (sizeMs <= 0) { + throw new IllegalArgumentException("Window sizeMs must be larger than zero."); + } + return new TimeWindows(sizeMs, sizeMs); } /** @@ -93,43 +87,59 @@ public class TimeWindows extends Windows<TimeWindow> { * {@link org.apache.kafka.streams.processor.TimestampExtractor}. * @return a new window definition */ - public TimeWindows advanceBy(long interval) { - return new TimeWindows(this.size, interval); + public TimeWindows advanceBy(final long advanceMs) { + if (advanceMs <= 0 || advanceMs > sizeMs) { + throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d]", sizeMs)); + } + return new TimeWindows(sizeMs, advanceMs); } @Override - public Map<Long, TimeWindow> windowsFor(long timestamp) { - long windowStart = (Math.max(0, timestamp - this.size + this.advance) / this.advance) * this.advance; - Map<Long, TimeWindow> windows = new HashMap<>(); + public Map<Long, TimeWindow> windowsFor(final long timestamp) { + long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs; + final Map<Long, TimeWindow> windows = new HashMap<>(); while (windowStart <= timestamp) { - TimeWindow window = new TimeWindow(windowStart, windowStart + this.size); + final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs); windows.put(windowStart, window); - windowStart += this.advance; + windowStart += advanceMs; } return windows; } @Override public long size() { - return size; + return sizeMs; + } + + public TimeWindows until(final long durationMs) throws IllegalArgumentException { + if (durationMs < sizeMs) { + throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size."); + } + super.until(durationMs); + return this; + } + + @Override + public long maintainMs() { + return Math.max(super.maintainMs(), sizeMs); } @Override - public final boolean equals(Object o) { + public final boolean equals(final Object o) { if (o == this) { return true; } if (!(o instanceof TimeWindows)) { return false; } - TimeWindows other = (TimeWindows) o; - return this.size == other.size && this.advance == other.advance; + final TimeWindows other = (TimeWindows) o; + return sizeMs == other.sizeMs && advanceMs == other.advanceMs; } @Override public int hashCode() { - int result = (int) (size ^ (size >>> 32)); - result = 31 * result + (int) (advance ^ (advance >>> 32)); + int result = (int) (sizeMs ^ (sizeMs >>> 32)); + result = 31 * result + (int) (advanceMs ^ (advanceMs >>> 32)); return result; } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 3dc6f65..8605f9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -27,24 +27,23 @@ import java.util.Map; */ public class UnlimitedWindows extends Windows<UnlimitedWindow> { - private static final long DEFAULT_START_TIMESTAMP = 0L; + private static final long DEFAULT_START_TIMESTAMP_MS = 0L; /** The start timestamp of the window. */ - public final long start; + public final long startMs; - private UnlimitedWindows(long start) { - super(); - if (start < 0) { - throw new IllegalArgumentException("start must be > 0 (you provided " + start + ")"); + private UnlimitedWindows(final long startMs) throws IllegalArgumentException { + if (startMs < 0) { + throw new IllegalArgumentException("startMs must be > 0 (you provided " + startMs + ")"); } - this.start = start; + this.startMs = startMs; } /** * Return an unlimited window starting at timestamp zero. */ public static UnlimitedWindows of() { - return new UnlimitedWindows(DEFAULT_START_TIMESTAMP); + return new UnlimitedWindows(DEFAULT_START_TIMESTAMP_MS); } /** @@ -53,18 +52,18 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { * @param start the window start time * @return a new unlimited window that starts at {@code start} */ - public UnlimitedWindows startOn(long start) { + public UnlimitedWindows startOn(final long start) throws IllegalArgumentException { return new UnlimitedWindows(start); } @Override - public Map<Long, UnlimitedWindow> windowsFor(long timestamp) { + public Map<Long, UnlimitedWindow> windowsFor(final long timestamp) { // always return the single unlimited window // we cannot use Collections.singleMap since it does not support remove() - Map<Long, UnlimitedWindow> windows = new HashMap<>(); - if (timestamp >= start) { - windows.put(start, new UnlimitedWindow(start)); + final Map<Long, UnlimitedWindow> windows = new HashMap<>(); + if (timestamp >= startMs) { + windows.put(startMs, new UnlimitedWindow(startMs)); } return windows; } @@ -75,7 +74,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { } @Override - public final boolean equals(Object o) { + public final boolean equals(final Object o) { if (o == this) { return true; } @@ -84,13 +83,18 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { return false; } - UnlimitedWindows other = (UnlimitedWindows) o; - return this.start == other.start; + final UnlimitedWindows other = (UnlimitedWindows) o; + return startMs == other.startMs; } @Override public int hashCode() { - return (int) (start ^ (start >>> 32)); + return (int) (startMs ^ (startMs >>> 32)); + } + + @Override + public UnlimitedWindows until(final long durationMs) { + throw new IllegalArgumentException("Window retention time (durationMs) cannot be set for UnlimitedWindows."); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index 13a9529..9c6edc0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -22,8 +22,8 @@ package org.apache.kafka.streams.kstream; */ public abstract class Window { - protected final long start; - protected final long end; + protected final long startMs; + protected final long endMs; /** * Create a new window for the given start time (inclusive) and end time (exclusive). @@ -33,32 +33,29 @@ public abstract class Window { * @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than * {@code start} */ - public Window(long start, long end) throws IllegalArgumentException { - if (start < 0) { - throw new IllegalArgumentException("Window start time cannot be negative."); + public Window(long startMs, long endMs) throws IllegalArgumentException { + if (startMs < 0) { + throw new IllegalArgumentException("Window startMs time cannot be negative."); } - if (end < 0) { - throw new IllegalArgumentException("Window end time cannot be negative."); + if (endMs < startMs) { + throw new IllegalArgumentException("Window endMs time cannot be smaller than window startMs time."); } - if (end < start) { - throw new IllegalArgumentException("Window end time cannot be smaller than window start time."); - } - this.start = start; - this.end = end; + this.startMs = startMs; + this.endMs = endMs; } /** * Return the start timestamp of this window, inclusive */ public long start() { - return start; + return startMs; } /** * Return the end timestamp of this window, exclusive */ public long end() { - return end; + return endMs; } /** @@ -67,10 +64,10 @@ public abstract class Window { * @param other another window * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise */ - public abstract boolean overlap(Window other); + public abstract boolean overlap(final Window other); @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (obj == this) { return true; } @@ -79,21 +76,20 @@ public abstract class Window { return false; } - Window other = (Window) obj; - return this.start == other.start && this.end == other.end; + final Window other = (Window) obj; + return startMs == other.startMs && endMs == other.endMs; } @Override public int hashCode() { - long n = (this.start << 32) | this.end; - return (int) (n % 0xFFFFFFFFL); + return (int) (((startMs << 32) | endMs) % 0xFFFFFFFFL); } @Override public String toString() { return "Window{" + - "start=" + start + - ", end=" + end + - '}'; + "start=" + startMs + + ", end=" + endMs + + '}'; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index 6606fcb..81357c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -26,11 +26,11 @@ package org.apache.kafka.streams.kstream; */ public class Windowed<K> { - private K key; + private final K key; - private Window window; + private final Window window; - public Windowed(K key, Window window) { + public Windowed(final K key, final Window window) { this.key = key; this.window = window; } @@ -59,21 +59,20 @@ public class Windowed<K> { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (obj == this) return true; if (!(obj instanceof Windowed)) return false; - Windowed<?> that = (Windowed) obj; - - return this.window.equals(that.window) && this.key.equals(that.key); + final Windowed<?> that = (Windowed) obj; + return window.equals(that.window) && key.equals(that.key); } @Override public int hashCode() { - long n = ((long) window.hashCode() << 32) | key.hashCode(); + final long n = ((long) window.hashCode() << 32) | key.hashCode(); return (int) (n % 0xFFFFFFFFL); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index ebd92fe..29b61fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream; import java.util.Map; @@ -28,15 +27,15 @@ public abstract class Windows<W extends Window> { private static final int DEFAULT_NUM_SEGMENTS = 3; - static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day + static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day private long maintainDurationMs; public int segments; protected Windows() { - this.segments = DEFAULT_NUM_SEGMENTS; - this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION; + segments = DEFAULT_NUM_SEGMENTS; + maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS; } /** @@ -45,8 +44,12 @@ public abstract class Windows<W extends Window> { * * @return itself */ - public Windows<W> until(long durationMs) { - this.maintainDurationMs = durationMs; + // This should always get overridden to provide the correct return type and thus to avoid a cast + public Windows<W> until(final long durationMs) throws IllegalArgumentException { + if (durationMs < 0) { + throw new IllegalArgumentException("Window retention time (durationMs) cannot be negative."); + } + maintainDurationMs = durationMs; return this; } @@ -57,7 +60,10 @@ public abstract class Windows<W extends Window> { * * @return itself */ - protected Windows<W> segments(int segments) { + protected Windows<W> segments(final int segments) throws IllegalArgumentException { + if (segments < 2) { + throw new IllegalArgumentException("Number of segments must be at least 2."); + } this.segments = segments; return this; @@ -69,7 +75,7 @@ public abstract class Windows<W extends Window> { * @return the window maintain duration in milliseconds of streams time */ public long maintainMs() { - return this.maintainDurationMs; + return maintainDurationMs; } /** @@ -78,7 +84,7 @@ public abstract class Windows<W extends Window> { * @param timestamp the timestamp window should get created for * @return a map of {@code windowStartTimestamp -> Window} entries */ - public abstract Map<Long, W> windowsFor(long timestamp); + public abstract Map<Long, W> windowsFor(final long timestamp); public abstract long size(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 3d41ae4..0434f06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -711,20 +711,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), - windows.before + windows.after + 1, + windows.beforeMs + windows.afterMs + 1, windows.maintainMs()); KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), - windows.before + windows.after + 1, + windows.beforeMs + windows.afterMs + 1, windows.maintainMs()); final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), - windows.before, - windows.after, + windows.beforeMs, + windows.afterMs, joiner, leftOuter); final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), - windows.after, - windows.before, + windows.afterMs, + windows.beforeMs, reverseJoiner(joiner), rightOuter); http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java index db63029..cf72752 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java @@ -35,8 +35,8 @@ public final class SessionWindow extends Window { * @param start the start timestamp of the window * @param end the end timestamp of the window */ - public SessionWindow(final long start, final long end) { - super(start, end); + public SessionWindow(final long startMs, final long endMs) { + super(startMs, endMs); } /** @@ -52,7 +52,7 @@ public final class SessionWindow extends Window { + other.getClass()); } final SessionWindow otherWindow = (SessionWindow) other; - return !(otherWindow.end < start || end < otherWindow.start); + return !(otherWindow.endMs < startMs || endMs < otherWindow.startMs); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java index 630821f..bf98f94 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java @@ -21,18 +21,21 @@ import org.apache.kafka.streams.kstream.Window; public class TimeWindow extends Window { - public TimeWindow(long start, long end) { - super(start, end); + public TimeWindow(long startMs, long endMs) { + super(startMs, endMs); + if (startMs == endMs) { + throw new IllegalArgumentException("Window endMs must be greater than window startMs."); + } } @Override - public boolean overlap(Window other) { + public boolean overlap(final Window other) throws IllegalArgumentException { if (getClass() != other.getClass()) { throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + other.getClass()); } final TimeWindow otherWindow = (TimeWindow) other; - return start < otherWindow.end && otherWindow.start < end; + return startMs < otherWindow.endMs && otherWindow.startMs < endMs; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java index e9ec040..7fb7c53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,12 +21,12 @@ import org.apache.kafka.streams.kstream.Window; public class UnlimitedWindow extends Window { - public UnlimitedWindow(long start) { - super(start, Long.MAX_VALUE); + public UnlimitedWindow(final long startMs) { + super(startMs, Long.MAX_VALUE); } @Override - public boolean overlap(Window other) { + public boolean overlap(final Window other) { if (getClass() != other.getClass()) { throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + other.getClass()); http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index b37e5e8..24387ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; public class JoinWindowsTest { @@ -44,8 +45,8 @@ public class JoinWindowsTest { assertEquals(w2, w1); assertEquals(w1.hashCode(), w2.hashCode()); - JoinWindows w3 = JoinWindows.of(w2.after).before(anyOtherSize); - JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.after); + JoinWindows w3 = JoinWindows.of(w2.afterMs).before(anyOtherSize); + JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.afterMs); assertEquals(w3, w4); assertEquals(w4, w3); assertEquals(w3.hashCode(), w4.hashCode()); @@ -55,13 +56,13 @@ public class JoinWindowsTest { assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1); assertNotEquals("must be false for different types", new Object(), w1); - JoinWindows differentWindowSize = JoinWindows.of(w1.after + 1); + JoinWindows differentWindowSize = JoinWindows.of(w1.afterMs + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize, w1); - JoinWindows differentWindowSize2 = JoinWindows.of(w1.after).after(w1.after + 1); + JoinWindows differentWindowSize2 = JoinWindows.of(w1.afterMs).after(w1.afterMs + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1); - JoinWindows differentWindowSize3 = JoinWindows.of(w1.after).before(w1.before + 1); + JoinWindows differentWindowSize3 = JoinWindows.of(w1.afterMs).before(w1.beforeMs + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1); } @@ -85,14 +86,55 @@ public class JoinWindowsTest { JoinWindows.of(-1); } - @Test(expected = IllegalArgumentException.class) - public void afterBelowLower() { - JoinWindows.of(anySize).after(-anySize - 1); + @Test + public void endTimeShouldNotBeBeforeStart() { + final JoinWindows windowSpec = JoinWindows.of(anySize); + try { + windowSpec.after(-anySize - 1); + fail("window end time should not be before window start time"); + } catch (final IllegalArgumentException e) { + // expected + } } - @Test(expected = IllegalArgumentException.class) - public void beforeOverUpper() { - JoinWindows.of(anySize).before(-anySize - 1); + @Test + public void startTimeShouldNotBeAfterEnd() { + final JoinWindows windowSpec = JoinWindows.of(anySize); + try { + windowSpec.before(-anySize - 1); + fail("window start time should not be after window end time"); + } catch (final IllegalArgumentException e) { + // expected + } + } + + @Test + public void untilShouldSetMaintainDuration() { + final JoinWindows windowSpec = JoinWindows.of(anySize); + final long windowSize = windowSpec.size(); + assertEquals(windowSize, windowSpec.until(windowSize).maintainMs()); + } + + @Test + public void shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() { + final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS; + + final JoinWindows windowSpec = JoinWindows.of(size); + final long windowSize = windowSpec.size(); + + assertEquals(windowSize, windowSpec.maintainMs()); + } + + @Test + public void retentionTimeMustNoBeSmallerThanWindowSize() { + final JoinWindows windowSpec = JoinWindows.of(anySize); + final long windowSize = windowSpec.size(); + try { + windowSpec.until(windowSize - 1); + fail("should not accept retention time smaller than window size"); + } catch (final IllegalArgumentException e) { + // expected + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java new file mode 100644 index 0000000..a9eced4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class SessionWindowsTest { + + @Test + public void shouldSetWindowGap() { + final long anyGap = 42L; + assertEquals(anyGap, SessionWindows.with(anyGap).inactivityGap()); + } + + @Test + public void shouldSetWindowRetentionTime() { + final long anyRetentionTime = 42L; + assertEquals(anyRetentionTime, SessionWindows.with(1).until(anyRetentionTime).maintainMs()); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeNegative() { + SessionWindows.with(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeZero() { + SessionWindows.with(0); + } + + @Test + public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() { + final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS; + assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs()); + } + + @Test + public void retentionTimeMustNotBeNegative() { + final SessionWindows windowSpec = SessionWindows.with(42); + try { + windowSpec.until(41); + fail("should not accept retention time smaller than gap"); + } catch (final IllegalArgumentException e) { + // expected + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 2bea16b..6b8b6ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -26,15 +26,38 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; public class TimeWindowsTest { - private static long anySize = 123L; + private static final long ANY_SIZE = 123L; + + @Test + public void shouldSetWindowSize() { + assertEquals(ANY_SIZE, TimeWindows.of(ANY_SIZE).sizeMs); + } + + @Test + public void shouldSetWindowAdvance() { + final long anyAdvance = 4; + assertEquals(anyAdvance, TimeWindows.of(ANY_SIZE).advanceBy(anyAdvance).advanceMs); + } + + @Test + public void shouldSetWindowRetentionTime() { + assertEquals(ANY_SIZE, TimeWindows.of(ANY_SIZE).until(ANY_SIZE).maintainMs()); + } + + @Test + public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() { + final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS; + assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs()); + } @Test public void shouldHaveSaneEqualsAndHashCode() { - TimeWindows w1 = TimeWindows.of(anySize); - TimeWindows w2 = TimeWindows.of(w1.size); + TimeWindows w1 = TimeWindows.of(ANY_SIZE); + TimeWindows w2 = TimeWindows.of(w1.sizeMs); // Reflexive assertEquals(w1, w1); @@ -46,7 +69,7 @@ public class TimeWindowsTest { assertEquals(w1.hashCode(), w2.hashCode()); // Transitive - TimeWindows w3 = TimeWindows.of(w2.size); + TimeWindows w3 = TimeWindows.of(w2.sizeMs); assertEquals(w2, w3); assertEquals(w1, w3); assertEquals(w1.hashCode(), w3.hashCode()); @@ -56,42 +79,69 @@ public class TimeWindowsTest { assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1); assertNotEquals("must be false for different types", new Object(), w1); - TimeWindows differentWindowSize = TimeWindows.of(w1.size + 1); + TimeWindows differentWindowSize = TimeWindows.of(w1.sizeMs + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize, w1); - TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1); + TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advanceMs - 1); assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval, w1); } + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeZero() { + TimeWindows.of(0); + } @Test(expected = IllegalArgumentException.class) public void windowSizeMustNotBeNegative() { TimeWindows.of(-1); } - @Test(expected = IllegalArgumentException.class) - public void windowSizeMustNotBeZero() { - TimeWindows.of(0); + @Test + public void advanceIntervalMustNotBeZero() { + final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE); + try { + windowSpec.advanceBy(0); + fail("should not accept zero advance parameter"); + } catch (final IllegalArgumentException e) { + // expected + } } - @Test(expected = IllegalArgumentException.class) + @Test public void advanceIntervalMustNotBeNegative() { - TimeWindows.of(anySize).advanceBy(-1); + final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE); + try { + windowSpec.advanceBy(-1); + fail("should not accept negative advance parameter"); + } catch (final IllegalArgumentException e) { + // expected + } } - @Test(expected = IllegalArgumentException.class) - public void advanceIntervalMustNotBeZero() { - TimeWindows.of(anySize).advanceBy(0); + @Test + public void advanceIntervalMustNotBeLargerThanWindowSize() { + final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE); + try { + windowSpec.advanceBy(ANY_SIZE + 1); + fail("should not accept advance greater than window size"); + } catch (final IllegalArgumentException e) { + // expected + } } - @Test(expected = IllegalArgumentException.class) - public void advanceIntervalMustNotBeLargerThanWindowSize() { - long size = anySize; - TimeWindows.of(size).advanceBy(size + 1); + @Test + public void retentionTimeMustNoBeSmallerThanWindowSize() { + final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE); + try { + windowSpec.until(ANY_SIZE - 1); + fail("should not accept retention time smaller than window size"); + } catch (final IllegalArgumentException e) { + // expected + } } @Test - public void windowsForHoppingWindows() { + public void shouldComputeWindowsForHoppingWindows() { TimeWindows windows = TimeWindows.of(12L).advanceBy(5L); Map<Long, TimeWindow> matched = windows.windowsFor(21L); assertEquals(12L / 5L + 1, matched.size()); @@ -101,7 +151,7 @@ public class TimeWindowsTest { } @Test - public void windowsForBarelyOverlappingHoppingWindows() { + public void shouldComputeWindowsForBarelyOverlappingHoppingWindows() { TimeWindows windows = TimeWindows.of(6L).advanceBy(5L); Map<Long, TimeWindow> matched = windows.windowsFor(7L); assertEquals(1, matched.size()); @@ -109,7 +159,7 @@ public class TimeWindowsTest { } @Test - public void windowsForTumblingWindows() { + public void shouldComputeWindowsForTumblingWindows() { TimeWindows windows = TimeWindows.of(12L); Map<Long, TimeWindow> matched = windows.windowsFor(21L); assertEquals(1, matched.size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java index c1f4be6..ea9078c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java @@ -26,25 +26,37 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class UnlimitedWindowsTest { private static long anyStartTime = 10L; + @Test + public void shouldSetWindowStartTime() { + assertEquals(anyStartTime, UnlimitedWindows.of().startOn(anyStartTime).startMs); + } + @Test(expected = IllegalArgumentException.class) public void startTimeMustNotBeNegative() { UnlimitedWindows.of().startOn(-1); } @Test - public void startTimeCanBeZero() { - UnlimitedWindows.of().startOn(0); + public void shouldThrowOnUntil() { + final UnlimitedWindows windowSpec = UnlimitedWindows.of(); + try { + windowSpec.until(42); + fail("should not allow to set window retention time"); + } catch (final IllegalArgumentException e) { + // expected + } } @Test public void shouldIncludeRecordsThatHappenedOnWindowStart() { UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime); - Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start); + Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.startMs); assertEquals(1, matchedWindows.size()); assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime)); } @@ -52,7 +64,7 @@ public class UnlimitedWindowsTest { @Test public void shouldIncludeRecordsThatHappenedAfterWindowStart() { UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime); - long timestamp = w.start + 1; + long timestamp = w.startMs + 1; Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp); assertEquals(1, matchedWindows.size()); assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime)); @@ -61,7 +73,7 @@ public class UnlimitedWindowsTest { @Test public void shouldExcludeRecordsThatHappenedBeforeWindowStart() { UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime); - long timestamp = w.start - 1; + long timestamp = w.startMs - 1; Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp); assertTrue(matchedWindows.isEmpty()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java new file mode 100644 index 0000000..55c5c60 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class WindowTest { + + static class TestWindow extends Window { + TestWindow(final long startMs, final long endMs) { + super(startMs, endMs); + } + + @Override + public boolean overlap(final Window other) { + return false; + } + } + + static class TestWindow2 extends Window { + TestWindow2(final long startMs, final long endMs) { + super(startMs, endMs); + } + + @Override + public boolean overlap(final Window other) { + return false; + } + } + + private final TestWindow window = new TestWindow(5, 10); + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfStartIsNegative() { + new TestWindow(-1, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfEndIsSmallerThanStart() { + new TestWindow(1, 0); + } + + @Test + public void shouldBeEqualIfStartAndEndSame() { + final TestWindow window2 = new TestWindow(window.startMs, window.endMs); + + assertEquals(window, window); + assertEquals(window, window2); + assertEquals(window2, window); + } + + @Test + public void shouldNotBeEqualIfStartOrEndIsDifferent() { + assertNotEquals(window, new TestWindow(0, window.endMs)); + assertNotEquals(window, new TestWindow(7, window.endMs)); + assertNotEquals(window, new TestWindow(window.startMs, 7)); + assertNotEquals(window, new TestWindow(window.startMs, 15)); + assertNotEquals(window, new TestWindow(7, 8)); + assertNotEquals(window, new TestWindow(0, 15)); + } + + @Test + public void shouldNotBeEqualIfDifferentWindowType() { + assertNotEquals(window, new TestWindow2(window.startMs, window.endMs)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java new file mode 100644 index 0000000..890265f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class WindowsTest { + + private class TestWindows extends Windows { + + @Override + public Map windowsFor(long timestamp) { + return null; + } + + @Override + public long size() { + return 0; + } + } + + @Test + public void shouldSetNumberOfSegments() { + final int anySegmentSizeLargerThanOne = 5; + assertEquals(anySegmentSizeLargerThanOne, new TestWindows().segments(anySegmentSizeLargerThanOne).segments); + } + + @Test + public void shouldSetWindowRetentionTime() { + final int anyNotNegativeRetentionTime = 42; + assertEquals(anyNotNegativeRetentionTime, new TestWindows().until(anyNotNegativeRetentionTime).maintainMs()); + } + + @Test(expected = IllegalArgumentException.class) + public void numberOfSegmentsMustBeAtLeastTwo() { + new TestWindows().segments(1); + } + + @Test(expected = IllegalArgumentException.class) + public void retentionTimeMustNotBeNegative() { + new TestWindows().until(-1); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java new file mode 100644 index 0000000..efa20b8 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TimeWindowTest { + + private long start = 50; + private long end = 100; + private final TimeWindow window = new TimeWindow(start, end); + private final SessionWindow sessionWindow = new SessionWindow(start, end); + + @Test(expected = IllegalArgumentException.class) + public void endMustBeLargerThanStart() { + new TimeWindow(start, start); + } + + @Test + public void shouldNotOverlapIfOtherWindowIsBeforeThisWindow() { + /* + * This: [-------) + * Other: [-----) + */ + assertFalse(window.overlap(new TimeWindow(0, 25))); + assertFalse(window.overlap(new TimeWindow(0, start - 1))); + assertFalse(window.overlap(new TimeWindow(0, start))); + } + + @Test + public void shouldOverlapIfOtherWindowEndIsWithinThisWindow() { + /* + * This: [-------) + * Other: [---------) + */ + assertTrue(window.overlap(new TimeWindow(0, start + 1))); + assertTrue(window.overlap(new TimeWindow(0, 75))); + assertTrue(window.overlap(new TimeWindow(0, end - 1))); + + assertTrue(window.overlap(new TimeWindow(start - 1, start + 1))); + assertTrue(window.overlap(new TimeWindow(start - 1, 75))); + assertTrue(window.overlap(new TimeWindow(start - 1, end - 1))); + } + + @Test + public void shouldOverlapIfOtherWindowContainsThisWindow() { + /* + * This: [-------) + * Other: [------------------) + */ + assertTrue(window.overlap(new TimeWindow(0, end))); + assertTrue(window.overlap(new TimeWindow(0, end + 1))); + assertTrue(window.overlap(new TimeWindow(0, 150))); + + assertTrue(window.overlap(new TimeWindow(start - 1, end))); + assertTrue(window.overlap(new TimeWindow(start - 1, end + 1))); + assertTrue(window.overlap(new TimeWindow(start - 1, 150))); + + assertTrue(window.overlap(new TimeWindow(start, end))); + assertTrue(window.overlap(new TimeWindow(start, end + 1))); + assertTrue(window.overlap(new TimeWindow(start, 150))); + } + + @Test + public void shouldOverlapIfOtherWindowIsWithinThisWindow() { + /* + * This: [-------) + * Other: [---) + */ + assertTrue(window.overlap(new TimeWindow(start, 75))); + assertTrue(window.overlap(new TimeWindow(start, end))); + assertTrue(window.overlap(new TimeWindow(75, end))); + } + + @Test + public void shouldOverlapIfOtherWindowStartIsWithinThisWindow() { + /* + * This: [-------) + * Other: [-------) + */ + assertTrue(window.overlap(new TimeWindow(start, end + 1))); + assertTrue(window.overlap(new TimeWindow(start, 150))); + assertTrue(window.overlap(new TimeWindow(75, end + 1))); + assertTrue(window.overlap(new TimeWindow(75, 150))); + } + + @Test + public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() { + /* + * This: [-------) + * Other: [------) + */ + assertFalse(window.overlap(new TimeWindow(end, end + 1))); + assertFalse(window.overlap(new TimeWindow(end, 150))); + assertFalse(window.overlap(new TimeWindow(end + 1, 150))); + assertFalse(window.overlap(new TimeWindow(125, 150))); + } + + @Test(expected = IllegalArgumentException.class) + public void cannotCompareTimeWindowWithDifferentWindowType() { + window.overlap(sessionWindow); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java new file mode 100644 index 0000000..f3c9cfb --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class UnlimitedWindowTest { + + private long start = 50; + private final UnlimitedWindow window = new UnlimitedWindow(start); + private final SessionWindow sessionWindow = new SessionWindow(start, start); + + @Test + public void shouldAlwaysOverlap() { + assertTrue(window.overlap(new UnlimitedWindow(start - 1))); + assertTrue(window.overlap(new UnlimitedWindow(start))); + assertTrue(window.overlap(new UnlimitedWindow(start + 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void cannotCompareUnlimitedWindowWithDifferentWindowType() { + window.overlap(sessionWindow); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 4be83be..0b6288f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -81,7 +81,7 @@ public class WindowedStreamPartitionerTest { Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); - for (int w = 0; w < 10; w++) { + for (int w = 1; w < 10; w++) { TimeWindow window = new TimeWindow(10 * w, 20 * w); Windowed<Integer> windowedKey = new Windowed<>(key, window); http://git-wip-us.apache.org/repos/asf/kafka/blob/79987590/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java index 8bfcb7b..51b3bf0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.test.ReadOnlySessionStoreStub; @@ -58,12 +58,12 @@ public class CompositeReadOnlySessionStoreTest { @Test public void shouldFetchResulstFromUnderlyingSessionStore() throws Exception { - underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); - underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(10, 10)), 2L); + underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(10, 10)), 2L); final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a")); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L)), + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L)), results); } @@ -79,8 +79,8 @@ public class CompositeReadOnlySessionStoreTest { ReadOnlySessionStoreStub<>(); stubProviderTwo.addStore(storeName, secondUnderlying); - final Windowed<String> keyOne = new Windowed<>("key-one", new TimeWindow(0, 0)); - final Windowed<String> keyTwo = new Windowed<>("key-two", new TimeWindow(0, 0)); + final Windowed<String> keyOne = new Windowed<>("key-one", new SessionWindow(0, 0)); + final Windowed<String> keyTwo = new Windowed<>("key-two", new SessionWindow(0, 0)); underlyingSessionStore.put(keyOne, 0L); secondUnderlying.put(keyTwo, 10L); @@ -93,8 +93,8 @@ public class CompositeReadOnlySessionStoreTest { @Test public void shouldNotGetValueFromOtherStores() throws Exception { - final Windowed<String> expectedKey = new Windowed<>("foo", new TimeWindow(0, 0)); - otherUnderlyingStore.put(new Windowed<>("foo", new TimeWindow(10, 10)), 10L); + final Windowed<String> expectedKey = new Windowed<>("foo", new SessionWindow(0, 0)); + otherUnderlyingStore.put(new Windowed<>("foo", new SessionWindow(10, 10)), 10L); underlyingSessionStore.put(expectedKey, 1L); final KeyValueIterator<Windowed<String>, Long> result = sessionStore.fetch("foo");