Repository: kafka
Updated Branches:
  refs/heads/0.10.2 0f87991d5 -> 4218c0247


KAFKA-4671: Fix Streams window retention policy

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2401 from mjsax/kafka-4671-window-retention-policy

(cherry picked from commit 79987590e3e96351ff75ce86718801ec605b2419)
Signed-off-by: Guozhang Wang <wangg...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4218c024
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4218c024
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4218c024

Branch: refs/heads/0.10.2
Commit: 4218c02478a213935afbb2c2d661d8a29a5f29ee
Parents: 0f87991
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Mon Jan 23 16:31:36 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Jan 23 16:31:46 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      |  60 +++++----
 .../kafka/streams/kstream/SessionWindows.java   |  19 ++-
 .../kafka/streams/kstream/TimeWindows.java      |  68 ++++++-----
 .../kafka/streams/kstream/UnlimitedWindows.java |  38 +++---
 .../apache/kafka/streams/kstream/Window.java    |  42 +++----
 .../apache/kafka/streams/kstream/Windowed.java  |  15 ++-
 .../apache/kafka/streams/kstream/Windows.java   |  24 ++--
 .../streams/kstream/internals/KStreamImpl.java  |  12 +-
 .../kstream/internals/SessionWindow.java        |   6 +-
 .../streams/kstream/internals/TimeWindow.java   |  11 +-
 .../kstream/internals/UnlimitedWindow.java      |  12 +-
 .../kafka/streams/kstream/JoinWindowsTest.java  |  64 ++++++++--
 .../streams/kstream/SessionWindowsTest.java     |  68 +++++++++++
 .../kafka/streams/kstream/TimeWindowsTest.java  |  92 ++++++++++----
 .../streams/kstream/UnlimitedWindowsTest.java   |  22 +++-
 .../kafka/streams/kstream/WindowTest.java       |  85 +++++++++++++
 .../kafka/streams/kstream/WindowsTest.java      |  62 ++++++++++
 .../kstream/internals/TimeWindowTest.java       | 122 +++++++++++++++++++
 .../kstream/internals/UnlimitedWindowTest.java  |  42 +++++++
 .../WindowedStreamPartitionerTest.java          |   2 +-
 .../CompositeReadOnlySessionStoreTest.java      |  18 +--
 21 files changed, 701 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 9317743..6dd1a85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
-
 import java.util.Map;
 
 /**
@@ -45,21 +43,19 @@ import java.util.Map;
  * Both values (before and after) must not result in an "inverse" window,
  * i.e., lower-interval-bound must not be larger than upper-interval.bound.
  */
-public class JoinWindows extends Windows<TimeWindow> {
+public class JoinWindows extends Windows<Window> {
 
     /** Maximum time difference for tuples that are before the join tuple. */
-    public final long before;
+    public final long beforeMs;
     /** Maximum time difference for tuples that are after the join tuple. */
-    public final long after;
-
-    private JoinWindows(long before, long after) {
-        super();
+    public final long afterMs;
 
-        if (before + after < 0) {
-            throw new IllegalArgumentException("Window interval (ie, 
before+after) must not be negative");
+    private JoinWindows(final long beforeMs, final long afterMs) {
+        if (beforeMs + afterMs < 0) {
+            throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative");
         }
-        this.after = after;
-        this.before = before;
+        this.afterMs = afterMs;
+        this.beforeMs = beforeMs;
     }
 
     /**
@@ -68,8 +64,8 @@ public class JoinWindows extends Windows<TimeWindow> {
      *
      * @param timeDifference    join window interval
      */
-    public static JoinWindows of(long timeDifference) {
-        return new JoinWindows(timeDifference, timeDifference);
+    public static JoinWindows of(final long timeDifferenceMs) throws 
IllegalArgumentException {
+        return new JoinWindows(timeDifferenceMs, timeDifferenceMs);
     }
 
     /**
@@ -79,8 +75,8 @@ public class JoinWindows extends Windows<TimeWindow> {
      *
      * @param timeDifference    join window interval
      */
-    public JoinWindows before(long timeDifference) {
-        return new JoinWindows(timeDifference, this.after);
+    public JoinWindows before(final long timeDifferenceMs) throws 
IllegalArgumentException {
+        return new JoinWindows(timeDifferenceMs, afterMs);
     }
 
     /**
@@ -90,25 +86,39 @@ public class JoinWindows extends Windows<TimeWindow> {
      *
      * @param timeDifference    join window interval
      */
-    public JoinWindows after(long timeDifference) {
-        return new JoinWindows(this.before, timeDifference);
+    public JoinWindows after(final long timeDifferenceMs) throws 
IllegalArgumentException {
+        return new JoinWindows(beforeMs, timeDifferenceMs);
     }
 
     /**
      * Not supported by {@link JoinWindows}. Throws {@link 
UnsupportedOperationException}.
      */
     @Override
-    public Map<Long, TimeWindow> windowsFor(long timestamp) {
+    public Map<Long, Window> windowsFor(final long timestamp) {
         throw new UnsupportedOperationException("windowsFor() is not supported 
in JoinWindows");
     }
 
     @Override
     public long size() {
-        return after + before;
+        return beforeMs + afterMs;
+    }
+
+    @Override
+    public JoinWindows until(final long durationMs) throws 
IllegalArgumentException {
+        if (durationMs < size()) {
+            throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be smaller than the window size.");
+        }
+        super.until(durationMs);
+        return this;
+    }
+
+    @Override
+    public long maintainMs() {
+        return Math.max(super.maintainMs(), size());
     }
 
     @Override
-    public final boolean equals(Object o) {
+    public final boolean equals(final Object o) {
         if (o == this) {
             return true;
         }
@@ -116,14 +126,14 @@ public class JoinWindows extends Windows<TimeWindow> {
             return false;
         }
 
-        JoinWindows other = (JoinWindows) o;
-        return this.before == other.before && this.after == other.after;
+        final JoinWindows other = (JoinWindows) o;
+        return beforeMs == other.beforeMs && afterMs == other.afterMs;
     }
 
     @Override
     public int hashCode() {
-        int result = (int) (before ^ (before >>> 32));
-        result = 31 * result + (int) (after ^ (after >>> 32));
+        int result = (int) (beforeMs ^ (beforeMs >>> 32));
+        result = 31 * result + (int) (afterMs ^ (afterMs >>> 32));
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index f9a399a..bed6c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -63,9 +63,9 @@ public class SessionWindows {
     private final long gapMs;
     private long maintainDurationMs;
 
-    private SessionWindows(final long gapMs, final long maintainDurationMs) {
+    private SessionWindows(final long gapMs) {
         this.gapMs = gapMs;
-        this.maintainDurationMs = maintainDurationMs;
+        maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
     }
 
     /**
@@ -75,7 +75,10 @@ public class SessionWindows {
      * and default maintain duration
      */
     public static SessionWindows with(final long inactivityGapMs) {
-        return new SessionWindows(inactivityGapMs, 
Windows.DEFAULT_MAINTAIN_DURATION);
+        if (inactivityGapMs < 1) {
+            throw new IllegalArgumentException("Gap time (inactivityGapMs) 
cannot be zero or negative.");
+        }
+        return new SessionWindows(inactivityGapMs);
     }
 
     /**
@@ -84,8 +87,12 @@ public class SessionWindows {
      *
      * @return  itself
      */
-    public SessionWindows until(final long durationMs) {
-        this.maintainDurationMs = durationMs;
+    public SessionWindows until(final long durationMs) throws 
IllegalArgumentException {
+        if (durationMs < gapMs) {
+            throw new IllegalArgumentException("Window retentin time 
(durationMs) cannot be smaller than window gap.");
+        }
+        maintainDurationMs = durationMs;
+
         return this;
     }
 
@@ -100,6 +107,6 @@ public class SessionWindows {
      * @return the minimum amount of time a window will be maintained for.
      */
     public long maintainMs() {
-        return maintainDurationMs;
+        return Math.max(maintainDurationMs, gapMs);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index ef94cf9..11df228 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -40,27 +40,18 @@ public class TimeWindows extends Windows<TimeWindow> {
      * The window size's effective time unit is determined by the semantics of 
the topology's
      * configured {@link 
org.apache.kafka.streams.processor.TimestampExtractor}.
      */
-    public final long size;
+    public final long sizeMs;
 
     /**
      * The size of the window's advance interval, i.e. by how much a window 
moves forward relative
      * to the previous one. The interval's effective time unit is determined 
by the semantics of
      * the topology's configured {@link 
org.apache.kafka.streams.processor.TimestampExtractor}.
      */
-    public final long advance;
+    public final long advanceMs;
 
-
-    private TimeWindows(long size, long advance) {
-        super();
-        if (size <= 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you 
provided " + size + ")");
-        }
-        this.size = size;
-        if (!(0 < advance && advance <= size)) {
-            throw new IllegalArgumentException(
-                String.format("advance interval (%d) must lie within interval 
(0, %d]", advance, size));
-        }
-        this.advance = advance;
+    private TimeWindows(final long sizeMs, final long advanceMs) throws 
IllegalArgumentException {
+        this.sizeMs = sizeMs;
+        this.advanceMs = advanceMs;
     }
 
     /**
@@ -76,8 +67,11 @@ public class TimeWindows extends Windows<TimeWindow> {
      *             topology's configured {@link 
org.apache.kafka.streams.processor.TimestampExtractor}.
      * @return a new window definition
      */
-    public static TimeWindows of(long size) {
-        return new TimeWindows(size, size);
+    public static TimeWindows of(final long sizeMs) throws 
IllegalArgumentException {
+        if (sizeMs <= 0) {
+            throw new IllegalArgumentException("Window sizeMs must be larger 
than zero.");
+        }
+        return new TimeWindows(sizeMs, sizeMs);
     }
 
     /**
@@ -93,43 +87,59 @@ public class TimeWindows extends Windows<TimeWindow> {
      *                 {@link 
org.apache.kafka.streams.processor.TimestampExtractor}.
      * @return a new window definition
      */
-    public TimeWindows advanceBy(long interval) {
-        return new TimeWindows(this.size, interval);
+    public TimeWindows advanceBy(final long advanceMs) {
+        if (advanceMs <= 0 || advanceMs > sizeMs) {
+            throw new IllegalArgumentException(String.format("AdvanceMs must 
lie within interval (0, %d]", sizeMs));
+        }
+        return new TimeWindows(sizeMs, advanceMs);
     }
 
     @Override
-    public Map<Long, TimeWindow> windowsFor(long timestamp) {
-        long windowStart = (Math.max(0, timestamp - this.size + this.advance) 
/ this.advance) * this.advance;
-        Map<Long, TimeWindow> windows = new HashMap<>();
+    public Map<Long, TimeWindow> windowsFor(final long timestamp) {
+        long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / 
advanceMs) * advanceMs;
+        final Map<Long, TimeWindow> windows = new HashMap<>();
         while (windowStart <= timestamp) {
-            TimeWindow window = new TimeWindow(windowStart, windowStart + 
this.size);
+            final TimeWindow window = new TimeWindow(windowStart, windowStart 
+ sizeMs);
             windows.put(windowStart, window);
-            windowStart += this.advance;
+            windowStart += advanceMs;
         }
         return windows;
     }
 
     @Override
     public long size() {
-        return size;
+        return sizeMs;
+    }
+
+    public TimeWindows until(final long durationMs) throws 
IllegalArgumentException {
+        if (durationMs < sizeMs) {
+            throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be smaller than the window size.");
+        }
+        super.until(durationMs);
+        return this;
+    }
+
+    @Override
+    public long maintainMs() {
+        return Math.max(super.maintainMs(), sizeMs);
     }
 
     @Override
-    public final boolean equals(Object o) {
+    public final boolean equals(final Object o) {
         if (o == this) {
             return true;
         }
         if (!(o instanceof TimeWindows)) {
             return false;
         }
-        TimeWindows other = (TimeWindows) o;
-        return this.size == other.size && this.advance == other.advance;
+        final TimeWindows other = (TimeWindows) o;
+        return sizeMs == other.sizeMs && advanceMs == other.advanceMs;
     }
 
     @Override
     public int hashCode() {
-        int result = (int) (size ^ (size >>> 32));
-        result = 31 * result + (int) (advance ^ (advance >>> 32));
+        int result = (int) (sizeMs ^ (sizeMs >>> 32));
+        result = 31 * result + (int) (advanceMs ^ (advanceMs >>> 32));
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 3dc6f65..8605f9d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -27,24 +27,23 @@ import java.util.Map;
  */
 public class UnlimitedWindows extends Windows<UnlimitedWindow> {
 
-    private static final long DEFAULT_START_TIMESTAMP = 0L;
+    private static final long DEFAULT_START_TIMESTAMP_MS = 0L;
 
     /** The start timestamp of the window. */
-    public final long start;
+    public final long startMs;
 
-    private UnlimitedWindows(long start) {
-        super();
-        if (start < 0) {
-            throw new IllegalArgumentException("start must be > 0 (you 
provided " + start + ")");
+    private UnlimitedWindows(final long startMs) throws 
IllegalArgumentException {
+        if (startMs < 0) {
+            throw new IllegalArgumentException("startMs must be > 0 (you 
provided " + startMs + ")");
         }
-        this.start = start;
+        this.startMs = startMs;
     }
 
     /**
      * Return an unlimited window starting at timestamp zero.
      */
     public static UnlimitedWindows of() {
-        return new UnlimitedWindows(DEFAULT_START_TIMESTAMP);
+        return new UnlimitedWindows(DEFAULT_START_TIMESTAMP_MS);
     }
 
     /**
@@ -53,18 +52,18 @@ public class UnlimitedWindows extends 
Windows<UnlimitedWindow> {
      * @param start  the window start time
      * @return       a new unlimited window that starts at {@code start}
      */
-    public UnlimitedWindows startOn(long start) {
+    public UnlimitedWindows startOn(final long start) throws 
IllegalArgumentException {
         return new UnlimitedWindows(start);
     }
 
     @Override
-    public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
+    public Map<Long, UnlimitedWindow> windowsFor(final long timestamp) {
         // always return the single unlimited window
 
         // we cannot use Collections.singleMap since it does not support 
remove()
-        Map<Long, UnlimitedWindow> windows = new HashMap<>();
-        if (timestamp >= start) {
-            windows.put(start, new UnlimitedWindow(start));
+        final Map<Long, UnlimitedWindow> windows = new HashMap<>();
+        if (timestamp >= startMs) {
+            windows.put(startMs, new UnlimitedWindow(startMs));
         }
         return windows;
     }
@@ -75,7 +74,7 @@ public class UnlimitedWindows extends 
Windows<UnlimitedWindow> {
     }
 
     @Override
-    public final boolean equals(Object o) {
+    public final boolean equals(final Object o) {
         if (o == this) {
             return true;
         }
@@ -84,13 +83,18 @@ public class UnlimitedWindows extends 
Windows<UnlimitedWindow> {
             return false;
         }
 
-        UnlimitedWindows other = (UnlimitedWindows) o;
-        return this.start == other.start;
+        final UnlimitedWindows other = (UnlimitedWindows) o;
+        return startMs == other.startMs;
     }
 
     @Override
     public int hashCode() {
-        return (int) (start ^ (start >>> 32));
+        return (int) (startMs ^ (startMs >>> 32));
+    }
+
+    @Override
+    public UnlimitedWindows until(final long durationMs) {
+        throw new IllegalArgumentException("Window retention time (durationMs) 
cannot be set for UnlimitedWindows.");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 13a9529..9c6edc0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -22,8 +22,8 @@ package org.apache.kafka.streams.kstream;
  */
 public abstract class Window {
 
-    protected final long start;
-    protected final long end;
+    protected final long startMs;
+    protected final long endMs;
 
     /**
      * Create a new window for the given start time (inclusive) and end time 
(exclusive).
@@ -33,32 +33,29 @@ public abstract class Window {
      * @throws IllegalArgumentException if {@code start} or {@code end} is 
negative or if {@code end} is smaller than
      * {@code start}
      */
-    public Window(long start, long end) throws IllegalArgumentException {
-        if (start < 0) {
-            throw new IllegalArgumentException("Window start time cannot be 
negative.");
+    public Window(long startMs, long endMs) throws IllegalArgumentException {
+        if (startMs < 0) {
+            throw new IllegalArgumentException("Window startMs time cannot be 
negative.");
         }
-        if (end < 0) {
-            throw new IllegalArgumentException("Window end time cannot be 
negative.");
+        if (endMs < startMs) {
+            throw new IllegalArgumentException("Window endMs time cannot be 
smaller than window startMs time.");
         }
-        if (end < start) {
-            throw new IllegalArgumentException("Window end time cannot be 
smaller than window start time.");
-        }
-        this.start = start;
-        this.end = end;
+        this.startMs = startMs;
+        this.endMs = endMs;
     }
 
     /**
      * Return the start timestamp of this window, inclusive
      */
     public long start() {
-        return start;
+        return startMs;
     }
 
     /**
      * Return the end timestamp of this window, exclusive
      */
     public long end() {
-        return end;
+        return endMs;
     }
 
     /**
@@ -67,10 +64,10 @@ public abstract class Window {
      * @param other  another window
      * @return       {@code true} if {@code other} overlaps with this 
window&mdash;{@code false} otherwise
      */
-    public abstract boolean overlap(Window other);
+    public abstract boolean overlap(final Window other);
 
     @Override
-    public boolean equals(Object obj) {
+    public boolean equals(final Object obj) {
         if (obj == this) {
             return true;
         }
@@ -79,21 +76,20 @@ public abstract class Window {
             return false;
         }
 
-        Window other = (Window) obj;
-        return this.start == other.start && this.end == other.end;
+        final Window other = (Window) obj;
+        return startMs == other.startMs && endMs == other.endMs;
     }
 
     @Override
     public int hashCode() {
-        long n = (this.start << 32) | this.end;
-        return (int) (n % 0xFFFFFFFFL);
+        return (int) (((startMs << 32) | endMs) % 0xFFFFFFFFL);
     }
 
     @Override
     public String toString() {
         return "Window{" +
-                "start=" + start +
-                ", end=" + end +
-                '}';
+            "start=" + startMs +
+            ", end=" + endMs +
+            '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 6606fcb..81357c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -26,11 +26,11 @@ package org.apache.kafka.streams.kstream;
  */
 public class Windowed<K> {
 
-    private K key;
+    private final K key;
 
-    private Window window;
+    private final Window window;
 
-    public Windowed(K key, Window window) {
+    public Windowed(final K key, final Window window) {
         this.key = key;
         this.window = window;
     }
@@ -59,21 +59,20 @@ public class Windowed<K> {
     }
 
     @Override
-    public boolean equals(Object obj) {
+    public boolean equals(final Object obj) {
         if (obj == this)
             return true;
 
         if (!(obj instanceof Windowed))
             return false;
 
-        Windowed<?> that = (Windowed) obj;
-
-        return this.window.equals(that.window) && this.key.equals(that.key);
+        final Windowed<?> that = (Windowed) obj;
+        return window.equals(that.window) && key.equals(that.key);
     }
 
     @Override
     public int hashCode() {
-        long n = ((long) window.hashCode() << 32) | key.hashCode();
+        final long n = ((long) window.hashCode() << 32) | key.hashCode();
         return (int) (n % 0xFFFFFFFFL);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index ebd92fe..29b61fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
 import java.util.Map;
@@ -28,15 +27,15 @@ public abstract class Windows<W extends Window> {
 
     private static final int DEFAULT_NUM_SEGMENTS = 3;
 
-    static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // 
one day
+    static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // 
one day
 
     private long maintainDurationMs;
 
     public int segments;
 
     protected Windows() {
-        this.segments = DEFAULT_NUM_SEGMENTS;
-        this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION;
+        segments = DEFAULT_NUM_SEGMENTS;
+        maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS;
     }
 
     /**
@@ -45,8 +44,12 @@ public abstract class Windows<W extends Window> {
      *
      * @return  itself
      */
-    public Windows<W> until(long durationMs) {
-        this.maintainDurationMs = durationMs;
+    // This should always get overridden to provide the correct return type 
and thus to avoid a cast
+    public Windows<W> until(final long durationMs) throws 
IllegalArgumentException {
+        if (durationMs < 0) {
+            throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be negative.");
+        }
+        maintainDurationMs = durationMs;
 
         return this;
     }
@@ -57,7 +60,10 @@ public abstract class Windows<W extends Window> {
      *
      * @return  itself
      */
-    protected Windows<W> segments(int segments) {
+    protected Windows<W> segments(final int segments) throws 
IllegalArgumentException {
+        if (segments < 2) {
+            throw new IllegalArgumentException("Number of segments must be at 
least 2.");
+        }
         this.segments = segments;
 
         return this;
@@ -69,7 +75,7 @@ public abstract class Windows<W extends Window> {
      * @return the window maintain duration in milliseconds of streams time
      */
     public long maintainMs() {
-        return this.maintainDurationMs;
+        return maintainDurationMs;
     }
 
     /**
@@ -78,7 +84,7 @@ public abstract class Windows<W extends Window> {
      * @param timestamp  the timestamp window should get created for
      * @return  a map of {@code windowStartTimestamp -> Window} entries
      */
-    public abstract Map<Long, W> windowsFor(long timestamp);
+    public abstract Map<Long, W> windowsFor(final long timestamp);
 
     public abstract long size();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 3d41ae4..0434f06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -711,20 +711,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
 
             KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindow.name(),
-                                                                               
    windows.before + windows.after + 1,
+                                                                               
    windows.beforeMs + windows.afterMs + 1,
                                                                                
    windows.maintainMs());
             KStreamJoinWindow<K1, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindow.name(),
-                                                                               
     windows.before + windows.after + 1,
+                                                                               
     windows.beforeMs + windows.afterMs + 1,
                                                                                
     windows.maintainMs());
 
             final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = 
new KStreamKStreamJoin<>(otherWindow.name(),
-                windows.before,
-                windows.after,
+                windows.beforeMs,
+                windows.afterMs,
                 joiner,
                 leftOuter);
             final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther 
= new KStreamKStreamJoin<>(thisWindow.name(),
-                windows.after,
-                windows.before,
+                windows.afterMs,
+                windows.beforeMs,
                 reverseJoiner(joiner),
                 rightOuter);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
index db63029..cf72752 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -35,8 +35,8 @@ public final class SessionWindow extends Window {
      * @param start  the start timestamp of the window
      * @param end    the end timestamp of the window
      */
-    public SessionWindow(final long start, final long end) {
-        super(start, end);
+    public SessionWindow(final long startMs, final long endMs) {
+        super(startMs, endMs);
     }
 
     /**
@@ -52,7 +52,7 @@ public final class SessionWindow extends Window {
                 + other.getClass());
         }
         final SessionWindow otherWindow = (SessionWindow) other;
-        return !(otherWindow.end < start || end < otherWindow.start);
+        return !(otherWindow.endMs < startMs || endMs < otherWindow.startMs);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
index 630821f..bf98f94 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -21,18 +21,21 @@ import org.apache.kafka.streams.kstream.Window;
 
 public class TimeWindow extends Window {
 
-    public TimeWindow(long start, long end) {
-        super(start, end);
+    public TimeWindow(long startMs, long endMs) {
+        super(startMs, endMs);
+        if (startMs == endMs) {
+            throw new IllegalArgumentException("Window endMs must be greater 
than window startMs.");
+        }
     }
 
     @Override
-    public boolean overlap(Window other) {
+    public boolean overlap(final Window other) throws IllegalArgumentException 
{
         if (getClass() != other.getClass()) {
             throw new IllegalArgumentException("Cannot compare windows of 
different type. Other window has type "
                 + other.getClass());
         }
         final TimeWindow otherWindow = (TimeWindow) other;
-        return start < otherWindow.end && otherWindow.start < end;
+        return startMs < otherWindow.endMs && otherWindow.startMs < endMs;
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index e9ec040..7fb7c53 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,12 +21,12 @@ import org.apache.kafka.streams.kstream.Window;
 
 public class UnlimitedWindow extends Window {
 
-    public UnlimitedWindow(long start) {
-        super(start, Long.MAX_VALUE);
+    public UnlimitedWindow(final long startMs) {
+        super(startMs, Long.MAX_VALUE);
     }
 
     @Override
-    public boolean overlap(Window other) {
+    public boolean overlap(final Window other) {
         if (getClass() != other.getClass()) {
             throw new IllegalArgumentException("Cannot compare windows of 
different type. Other window has type "
                 + other.getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index b37e5e8..24387ad 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
 
 
 public class JoinWindowsTest {
@@ -44,8 +45,8 @@ public class JoinWindowsTest {
         assertEquals(w2, w1);
         assertEquals(w1.hashCode(), w2.hashCode());
 
-        JoinWindows w3 = JoinWindows.of(w2.after).before(anyOtherSize);
-        JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.after);
+        JoinWindows w3 = JoinWindows.of(w2.afterMs).before(anyOtherSize);
+        JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.afterMs);
         assertEquals(w3, w4);
         assertEquals(w4, w3);
         assertEquals(w3.hashCode(), w4.hashCode());
@@ -55,13 +56,13 @@ public class JoinWindowsTest {
         assertNotEquals("must be false for different window types", 
UnlimitedWindows.of(), w1);
         assertNotEquals("must be false for different types", new Object(), w1);
 
-        JoinWindows differentWindowSize = JoinWindows.of(w1.after + 1);
+        JoinWindows differentWindowSize = JoinWindows.of(w1.afterMs + 1);
         assertNotEquals("must be false when window sizes are different", 
differentWindowSize, w1);
 
-        JoinWindows differentWindowSize2 = 
JoinWindows.of(w1.after).after(w1.after + 1);
+        JoinWindows differentWindowSize2 = 
JoinWindows.of(w1.afterMs).after(w1.afterMs + 1);
         assertNotEquals("must be false when window sizes are different", 
differentWindowSize2, w1);
 
-        JoinWindows differentWindowSize3 = 
JoinWindows.of(w1.after).before(w1.before + 1);
+        JoinWindows differentWindowSize3 = 
JoinWindows.of(w1.afterMs).before(w1.beforeMs + 1);
         assertNotEquals("must be false when window sizes are different", 
differentWindowSize3, w1);
     }
 
@@ -85,14 +86,55 @@ public class JoinWindowsTest {
         JoinWindows.of(-1);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void afterBelowLower() {
-        JoinWindows.of(anySize).after(-anySize - 1);
+    @Test
+    public void endTimeShouldNotBeBeforeStart() {
+        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        try {
+            windowSpec.after(-anySize - 1);
+            fail("window end time should not be before window start time");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void beforeOverUpper() {
-        JoinWindows.of(anySize).before(-anySize - 1);
+    @Test
+    public void startTimeShouldNotBeAfterEnd() {
+        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        try {
+            windowSpec.before(-anySize - 1);
+            fail("window start time should not be after window end time");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void untilShouldSetMaintainDuration() {
+        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final long windowSize = windowSpec.size();
+        assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
+    }
+
+    @Test
+    public void 
shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() {
+        final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS;
+
+        final JoinWindows windowSpec = JoinWindows.of(size);
+        final long windowSize = windowSpec.size();
+
+        assertEquals(windowSize, windowSpec.maintainMs());
+    }
+
+    @Test
+    public void retentionTimeMustNoBeSmallerThanWindowSize() {
+        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final long windowSize = windowSpec.size();
+        try {
+            windowSpec.until(windowSize - 1);
+            fail("should not accept retention time smaller than window size");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
new file mode 100644
index 0000000..a9eced4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SessionWindowsTest {
+
+    @Test
+    public void shouldSetWindowGap() {
+        final long anyGap = 42L;
+        assertEquals(anyGap, SessionWindows.with(anyGap).inactivityGap());
+    }
+
+    @Test
+    public void shouldSetWindowRetentionTime() {
+        final long anyRetentionTime = 42L;
+        assertEquals(anyRetentionTime, 
SessionWindows.with(1).until(anyRetentionTime).maintainMs());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeNegative() {
+        SessionWindows.with(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeZero() {
+        SessionWindows.with(0);
+    }
+
+    @Test
+    public void 
retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
+        final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
+    }
+
+    @Test
+    public void retentionTimeMustNotBeNegative() {
+        final SessionWindows windowSpec = SessionWindows.with(42);
+        try {
+            windowSpec.until(41);
+            fail("should not accept retention time smaller than gap");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 2bea16b..6b8b6ea 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -26,15 +26,38 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
 
 public class TimeWindowsTest {
 
-    private static long anySize = 123L;
+    private static final long ANY_SIZE = 123L;
+
+    @Test
+    public void shouldSetWindowSize() {
+        assertEquals(ANY_SIZE, TimeWindows.of(ANY_SIZE).sizeMs);
+    }
+
+    @Test
+    public void shouldSetWindowAdvance() {
+        final long anyAdvance = 4;
+        assertEquals(anyAdvance, 
TimeWindows.of(ANY_SIZE).advanceBy(anyAdvance).advanceMs);
+    }
+
+    @Test
+    public void shouldSetWindowRetentionTime() {
+        assertEquals(ANY_SIZE, 
TimeWindows.of(ANY_SIZE).until(ANY_SIZE).maintainMs());
+    }
+
+    @Test
+    public void 
shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime()
 {
+        final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
+    }
 
     @Test
     public void shouldHaveSaneEqualsAndHashCode() {
-        TimeWindows w1 = TimeWindows.of(anySize);
-        TimeWindows w2 = TimeWindows.of(w1.size);
+        TimeWindows w1 = TimeWindows.of(ANY_SIZE);
+        TimeWindows w2 = TimeWindows.of(w1.sizeMs);
 
         // Reflexive
         assertEquals(w1, w1);
@@ -46,7 +69,7 @@ public class TimeWindowsTest {
         assertEquals(w1.hashCode(), w2.hashCode());
 
         // Transitive
-        TimeWindows w3 = TimeWindows.of(w2.size);
+        TimeWindows w3 = TimeWindows.of(w2.sizeMs);
         assertEquals(w2, w3);
         assertEquals(w1, w3);
         assertEquals(w1.hashCode(), w3.hashCode());
@@ -56,42 +79,69 @@ public class TimeWindowsTest {
         assertNotEquals("must be false for different window types", 
UnlimitedWindows.of(), w1);
         assertNotEquals("must be false for different types", new Object(), w1);
 
-        TimeWindows differentWindowSize = TimeWindows.of(w1.size + 1);
+        TimeWindows differentWindowSize = TimeWindows.of(w1.sizeMs + 1);
         assertNotEquals("must be false when window sizes are different", 
differentWindowSize, w1);
 
-        TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1);
+        TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advanceMs - 1);
         assertNotEquals("must be false when advance intervals are different", 
differentAdvanceInterval, w1);
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeZero() {
+        TimeWindows.of(0);
+    }
 
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeNegative() {
         TimeWindows.of(-1);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void windowSizeMustNotBeZero() {
-        TimeWindows.of(0);
+    @Test
+    public void advanceIntervalMustNotBeZero() {
+        final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
+        try {
+            windowSpec.advanceBy(0);
+            fail("should not accept zero advance parameter");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void advanceIntervalMustNotBeNegative() {
-        TimeWindows.of(anySize).advanceBy(-1);
+        final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
+        try {
+            windowSpec.advanceBy(-1);
+            fail("should not accept negative advance parameter");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void advanceIntervalMustNotBeZero() {
-        TimeWindows.of(anySize).advanceBy(0);
+    @Test
+    public void advanceIntervalMustNotBeLargerThanWindowSize() {
+        final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
+        try {
+            windowSpec.advanceBy(ANY_SIZE + 1);
+            fail("should not accept advance greater than window size");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void advanceIntervalMustNotBeLargerThanWindowSize() {
-        long size = anySize;
-        TimeWindows.of(size).advanceBy(size + 1);
+    @Test
+    public void retentionTimeMustNoBeSmallerThanWindowSize() {
+        final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
+        try {
+            windowSpec.until(ANY_SIZE - 1);
+            fail("should not accept retention time smaller than window size");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
     @Test
-    public void windowsForHoppingWindows() {
+    public void shouldComputeWindowsForHoppingWindows() {
         TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
         Map<Long, TimeWindow> matched = windows.windowsFor(21L);
         assertEquals(12L / 5L + 1, matched.size());
@@ -101,7 +151,7 @@ public class TimeWindowsTest {
     }
 
     @Test
-    public void windowsForBarelyOverlappingHoppingWindows() {
+    public void shouldComputeWindowsForBarelyOverlappingHoppingWindows() {
         TimeWindows windows = TimeWindows.of(6L).advanceBy(5L);
         Map<Long, TimeWindow> matched = windows.windowsFor(7L);
         assertEquals(1, matched.size());
@@ -109,7 +159,7 @@ public class TimeWindowsTest {
     }
 
     @Test
-    public void windowsForTumblingWindows() {
+    public void shouldComputeWindowsForTumblingWindows() {
         TimeWindows windows = TimeWindows.of(12L);
         Map<Long, TimeWindow> matched = windows.windowsFor(21L);
         assertEquals(1, matched.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
index c1f4be6..ea9078c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -26,25 +26,37 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class UnlimitedWindowsTest {
 
     private static long anyStartTime = 10L;
 
+    @Test
+    public void shouldSetWindowStartTime() {
+        assertEquals(anyStartTime, 
UnlimitedWindows.of().startOn(anyStartTime).startMs);
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void startTimeMustNotBeNegative() {
         UnlimitedWindows.of().startOn(-1);
     }
 
     @Test
-    public void startTimeCanBeZero() {
-        UnlimitedWindows.of().startOn(0);
+    public void shouldThrowOnUntil() {
+        final UnlimitedWindows windowSpec = UnlimitedWindows.of();
+        try {
+            windowSpec.until(42);
+            fail("should not allow to set window retention time");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
     }
 
     @Test
     public void shouldIncludeRecordsThatHappenedOnWindowStart() {
         UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
-        Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start);
+        Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.startMs);
         assertEquals(1, matchedWindows.size());
         assertEquals(new UnlimitedWindow(anyStartTime), 
matchedWindows.get(anyStartTime));
     }
@@ -52,7 +64,7 @@ public class UnlimitedWindowsTest {
     @Test
     public void shouldIncludeRecordsThatHappenedAfterWindowStart() {
         UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
-        long timestamp = w.start + 1;
+        long timestamp = w.startMs + 1;
         Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
         assertEquals(1, matchedWindows.size());
         assertEquals(new UnlimitedWindow(anyStartTime), 
matchedWindows.get(anyStartTime));
@@ -61,7 +73,7 @@ public class UnlimitedWindowsTest {
     @Test
     public void shouldExcludeRecordsThatHappenedBeforeWindowStart() {
         UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
-        long timestamp = w.start - 1;
+        long timestamp = w.startMs - 1;
         Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
         assertTrue(matchedWindows.isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
new file mode 100644
index 0000000..55c5c60
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class WindowTest {
+
+    static class TestWindow extends Window {
+        TestWindow(final long startMs, final long endMs) {
+            super(startMs, endMs);
+        }
+
+        @Override
+        public boolean overlap(final Window other) {
+            return false;
+        }
+    }
+
+    static class TestWindow2 extends  Window {
+        TestWindow2(final long startMs, final long endMs) {
+            super(startMs, endMs);
+        }
+
+        @Override
+        public boolean overlap(final Window other) {
+            return false;
+        }
+    }
+
+    private final TestWindow window = new TestWindow(5, 10);
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfStartIsNegative() {
+        new TestWindow(-1, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfEndIsSmallerThanStart() {
+        new TestWindow(1, 0);
+    }
+
+    @Test
+    public void shouldBeEqualIfStartAndEndSame() {
+        final TestWindow window2 = new TestWindow(window.startMs, 
window.endMs);
+
+        assertEquals(window, window);
+        assertEquals(window, window2);
+        assertEquals(window2, window);
+    }
+
+    @Test
+    public void shouldNotBeEqualIfStartOrEndIsDifferent() {
+        assertNotEquals(window, new TestWindow(0, window.endMs));
+        assertNotEquals(window, new TestWindow(7, window.endMs));
+        assertNotEquals(window, new TestWindow(window.startMs, 7));
+        assertNotEquals(window, new TestWindow(window.startMs, 15));
+        assertNotEquals(window, new TestWindow(7, 8));
+        assertNotEquals(window, new TestWindow(0, 15));
+    }
+
+    @Test
+    public void shouldNotBeEqualIfDifferentWindowType() {
+        assertNotEquals(window, new TestWindow2(window.startMs, window.endMs));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
new file mode 100644
index 0000000..890265f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowsTest {
+
+    private class TestWindows extends Windows {
+
+        @Override
+        public Map windowsFor(long timestamp) {
+            return null;
+        }
+
+        @Override
+        public long size() {
+            return 0;
+        }
+    }
+
+    @Test
+    public void shouldSetNumberOfSegments() {
+        final int anySegmentSizeLargerThanOne = 5;
+        assertEquals(anySegmentSizeLargerThanOne, new 
TestWindows().segments(anySegmentSizeLargerThanOne).segments);
+    }
+
+    @Test
+    public void shouldSetWindowRetentionTime() {
+        final int anyNotNegativeRetentionTime = 42;
+        assertEquals(anyNotNegativeRetentionTime, new 
TestWindows().until(anyNotNegativeRetentionTime).maintainMs());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void numberOfSegmentsMustBeAtLeastTwo() {
+        new TestWindows().segments(1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void retentionTimeMustNotBeNegative() {
+        new TestWindows().until(-1);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
new file mode 100644
index 0000000..efa20b8
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimeWindowTest {
+
+    private long start = 50;
+    private long end = 100;
+    private final TimeWindow window = new TimeWindow(start, end);
+    private final SessionWindow sessionWindow = new SessionWindow(start, end);
+
+    @Test(expected = IllegalArgumentException.class)
+    public void endMustBeLargerThanStart() {
+        new TimeWindow(start, start);
+    }
+
+    @Test
+    public void shouldNotOverlapIfOtherWindowIsBeforeThisWindow() {
+        /*
+         * This:        [-------)
+         * Other: [-----)
+         */
+        assertFalse(window.overlap(new TimeWindow(0, 25)));
+        assertFalse(window.overlap(new TimeWindow(0, start - 1)));
+        assertFalse(window.overlap(new TimeWindow(0, start)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowEndIsWithinThisWindow() {
+        /*
+         * This:        [-------)
+         * Other: [---------)
+         */
+        assertTrue(window.overlap(new TimeWindow(0, start + 1)));
+        assertTrue(window.overlap(new TimeWindow(0, 75)));
+        assertTrue(window.overlap(new TimeWindow(0, end - 1)));
+
+        assertTrue(window.overlap(new TimeWindow(start - 1, start + 1)));
+        assertTrue(window.overlap(new TimeWindow(start - 1, 75)));
+        assertTrue(window.overlap(new TimeWindow(start - 1, end - 1)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowContainsThisWindow() {
+        /*
+         * This:        [-------)
+         * Other: [------------------)
+         */
+        assertTrue(window.overlap(new TimeWindow(0, end)));
+        assertTrue(window.overlap(new TimeWindow(0, end + 1)));
+        assertTrue(window.overlap(new TimeWindow(0, 150)));
+
+        assertTrue(window.overlap(new TimeWindow(start - 1, end)));
+        assertTrue(window.overlap(new TimeWindow(start - 1, end + 1)));
+        assertTrue(window.overlap(new TimeWindow(start - 1, 150)));
+
+        assertTrue(window.overlap(new TimeWindow(start, end)));
+        assertTrue(window.overlap(new TimeWindow(start, end + 1)));
+        assertTrue(window.overlap(new TimeWindow(start, 150)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowIsWithinThisWindow() {
+        /*
+         * This:        [-------)
+         * Other:         [---)
+         */
+        assertTrue(window.overlap(new TimeWindow(start, 75)));
+        assertTrue(window.overlap(new TimeWindow(start, end)));
+        assertTrue(window.overlap(new TimeWindow(75, end)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowStartIsWithinThisWindow() {
+        /*
+         * This:        [-------)
+         * Other:           [-------)
+         */
+        assertTrue(window.overlap(new TimeWindow(start, end + 1)));
+        assertTrue(window.overlap(new TimeWindow(start, 150)));
+        assertTrue(window.overlap(new TimeWindow(75, end + 1)));
+        assertTrue(window.overlap(new TimeWindow(75, 150)));
+    }
+
+    @Test
+    public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() {
+        /*
+         * This:        [-------)
+         * Other:               [------)
+         */
+        assertFalse(window.overlap(new TimeWindow(end, end + 1)));
+        assertFalse(window.overlap(new TimeWindow(end, 150)));
+        assertFalse(window.overlap(new TimeWindow(end + 1, 150)));
+        assertFalse(window.overlap(new TimeWindow(125, 150)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void cannotCompareTimeWindowWithDifferentWindowType() {
+        window.overlap(sessionWindow);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
new file mode 100644
index 0000000..f3c9cfb
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindowTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class UnlimitedWindowTest {
+
+    private long start = 50;
+    private final UnlimitedWindow window = new UnlimitedWindow(start);
+    private final SessionWindow sessionWindow = new SessionWindow(start, 
start);
+
+    @Test
+    public void shouldAlwaysOverlap() {
+        assertTrue(window.overlap(new UnlimitedWindow(start - 1)));
+        assertTrue(window.overlap(new UnlimitedWindow(start)));
+        assertTrue(window.overlap(new UnlimitedWindow(start + 1)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void cannotCompareUnlimitedWindowWithDifferentWindowType() {
+        window.overlap(sessionWindow);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 4be83be..0b6288f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -81,7 +81,7 @@ public class WindowedStreamPartitionerTest {
 
             Integer expected = defaultPartitioner.partition("topic", key, 
keyBytes, value, valueBytes, cluster);
 
-            for (int w = 0; w < 10; w++) {
+            for (int w = 1; w < 10; w++) {
                 TimeWindow window = new TimeWindow(10 * w, 20 * w);
 
                 Windowed<Integer> windowedKey = new Windowed<>(key, window);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4218c024/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index 8bfcb7b..51b3bf0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.test.ReadOnlySessionStoreStub;
@@ -58,12 +58,12 @@ public class CompositeReadOnlySessionStoreTest {
 
     @Test
     public void shouldFetchResulstFromUnderlyingSessionStore() throws 
Exception {
-        underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 
1L);
-        underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(10, 
10)), 2L);
+        underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 
0)), 1L);
+        underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(10, 
10)), 2L);
 
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(sessionStore.fetch("a"));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
TimeWindow(0, 0)), 1L),
-                                   KeyValue.pair(new Windowed<>("a", new 
TimeWindow(10, 10)), 2L)),
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), 1L),
+                                   KeyValue.pair(new Windowed<>("a", new 
SessionWindow(10, 10)), 2L)),
                      results);
     }
 
@@ -79,8 +79,8 @@ public class CompositeReadOnlySessionStoreTest {
                 ReadOnlySessionStoreStub<>();
         stubProviderTwo.addStore(storeName, secondUnderlying);
 
-        final Windowed<String> keyOne = new Windowed<>("key-one", new 
TimeWindow(0, 0));
-        final Windowed<String> keyTwo = new Windowed<>("key-two", new 
TimeWindow(0, 0));
+        final Windowed<String> keyOne = new Windowed<>("key-one", new 
SessionWindow(0, 0));
+        final Windowed<String> keyTwo = new Windowed<>("key-two", new 
SessionWindow(0, 0));
         underlyingSessionStore.put(keyOne, 0L);
         secondUnderlying.put(keyTwo, 10L);
 
@@ -93,8 +93,8 @@ public class CompositeReadOnlySessionStoreTest {
 
     @Test
     public void shouldNotGetValueFromOtherStores() throws Exception {
-        final Windowed<String> expectedKey = new Windowed<>("foo", new 
TimeWindow(0, 0));
-        otherUnderlyingStore.put(new Windowed<>("foo", new TimeWindow(10, 
10)), 10L);
+        final Windowed<String> expectedKey = new Windowed<>("foo", new 
SessionWindow(0, 0));
+        otherUnderlyingStore.put(new Windowed<>("foo", new SessionWindow(10, 
10)), 10L);
         underlyingSessionStore.put(expectedKey, 1L);
 
         final KeyValueIterator<Windowed<String>, Long> result = 
sessionStore.fetch("foo");

Reply via email to