Repository: kafka
Updated Branches:
  refs/heads/trunk 23dff4b04 -> 3c6051165


KAFKA-3452; Follow-up: Add SessionWindows

 - TimeWindows represent half-open time intervals while SessionWindows 
represent closed time intervals

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Damian Guy <damian....@gmail.com>, Guozhang Wang <wangg...@gmail.com>

Closes #2342 from mjsax/kafka-3452-session-window-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c605116
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c605116
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c605116

Branch: refs/heads/trunk
Commit: 3c60511655df3349323e394bdc3836300991352f
Parents: 23dff4b
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Wed Jan 11 20:33:42 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Jan 11 20:33:42 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/Window.java    |  21 +++-
 .../KStreamSessionWindowAggregate.java          |  12 +-
 .../kstream/internals/SessionKeySerde.java      |   2 +-
 .../kstream/internals/SessionWindow.java        |  58 +++++++++
 .../streams/kstream/internals/TimeWindow.java   |   7 +-
 .../kstream/internals/UnlimitedWindow.java      |   6 +-
 .../state/internals/SessionKeySchema.java       |   6 +-
 .../KStreamAggregationIntegrationTest.java      |  34 ++---
 .../internals/KGroupedStreamImplTest.java       |  18 +--
 ...reamSessionWindowAggregateProcessorTest.java |  40 +++---
 .../kstream/internals/SessionKeySerdeTest.java  |  10 +-
 .../kstream/internals/SessionWindowTest.java    | 124 +++++++++++++++++++
 .../internals/CachingSessionStoreTest.java      |  42 +++----
 .../RocksDBSegmentedBytesStoreTest.java         |  44 +++----
 .../internals/RocksDBSessionStoreTest.java      |  46 +++----
 15 files changed, 335 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 7d78d74..13a9529 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -22,16 +22,27 @@ package org.apache.kafka.streams.kstream;
  */
 public abstract class Window {
 
-    private long start;
-    private long end;
+    protected final long start;
+    protected final long end;
 
     /**
      * Create a new window for the given start time (inclusive) and end time 
(exclusive).
      *
      * @param start  the start timestamp of the window (inclusive)
      * @param end    the end timestamp of the window (exclusive)
+     * @throws IllegalArgumentException if {@code start} or {@code end} is 
negative or if {@code end} is smaller than
+     * {@code start}
      */
-    public Window(long start, long end) {
+    public Window(long start, long end) throws IllegalArgumentException {
+        if (start < 0) {
+            throw new IllegalArgumentException("Window start time cannot be 
negative.");
+        }
+        if (end < 0) {
+            throw new IllegalArgumentException("Window end time cannot be 
negative.");
+        }
+        if (end < start) {
+            throw new IllegalArgumentException("Window end time cannot be 
smaller than window start time.");
+        }
         this.start = start;
         this.end = end;
     }
@@ -56,9 +67,7 @@ public abstract class Window {
      * @param other  another window
      * @return       {@code true} if {@code other} overlaps with this 
window&mdash;{@code false} otherwise
      */
-    public boolean overlap(Window other) {
-        return this.start() < other.end() || other.start() < this.end();
-    }
+    public abstract boolean overlap(Window other);
 
     @Override
     public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index bb86f52..70b2b90 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -88,8 +88,8 @@ class KStreamSessionWindowAggregate<K, V, T> implements 
KStreamAggProcessorSuppl
 
             final long timestamp = context().timestamp();
             final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>();
-            final TimeWindow newTimeWindow = new TimeWindow(timestamp, 
timestamp);
-            TimeWindow mergedWindow = newTimeWindow;
+            final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
+            SessionWindow mergedWindow = newSessionWindow;
             T agg = initializer.apply();
 
             try (final KeyValueIterator<Windowed<K>, T> iterator = 
store.findSessions(key, timestamp - windows.inactivityGap(),
@@ -98,13 +98,13 @@ class KStreamSessionWindowAggregate<K, V, T> implements 
KStreamAggProcessorSuppl
                     final KeyValue<Windowed<K>, T> next = iterator.next();
                     merged.add(next);
                     agg = sessionMerger.apply(key, agg, next.value);
-                    mergedWindow = mergeTimeWindow(mergedWindow, (TimeWindow) 
next.key.window());
+                    mergedWindow = mergeSessionWindow(mergedWindow, 
(SessionWindow) next.key.window());
                 }
             }
 
             agg = aggregator.apply(key, value, agg);
             final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
-            if (!mergedWindow.equals(newTimeWindow)) {
+            if (!mergedWindow.equals(newSessionWindow)) {
                 for (final KeyValue<Windowed<K>, T> session : merged) {
                     store.remove(session.key);
                     tupleForwarder.maybeForward(session.key, null, 
session.value);
@@ -117,10 +117,10 @@ class KStreamSessionWindowAggregate<K, V, T> implements 
KStreamAggProcessorSuppl
     }
 
 
-    private TimeWindow mergeTimeWindow(final TimeWindow one, final TimeWindow 
two) {
+    private SessionWindow mergeSessionWindow(final SessionWindow one, final 
SessionWindow two) {
         final long start = one.start() < two.start() ? one.start() : 
two.start();
         final long end = one.end() > two.end() ? one.end() : two.end();
-        return new TimeWindow(start, end);
+        return new SessionWindow(start, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 165d5c6..48213d6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -131,7 +131,7 @@ public class SessionKeySerde<K> implements 
Serde<Windowed<K>> {
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
-        return new Windowed<>(key, new TimeWindow(start, end));
+        return new Windowed<>(key, new SessionWindow(start, end));
     }
 
     private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> 
deserializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
new file mode 100644
index 0000000..db63029
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+
+/**
+ * A session window covers a closed time interval with its start and end 
timestamp both being an inclusive boundary.
+ *
+ * @see TimeWindow
+ * @see UnlimitedWindow
+ * @see org.apache.kafka.streams.kstream.SessionWindows
+ */
+public final class SessionWindow extends Window {
+
+    /**
+     * Create a new window for the given start time and end time (both 
inclusive).
+     *
+     * @param start  the start timestamp of the window
+     * @param end    the end timestamp of the window
+     */
+    public SessionWindow(final long start, final long end) {
+        super(start, end);
+    }
+
+    /**
+     * Check if the given window overlaps with this window.
+     *
+     * @param other  another window
+     * @return       {@code true} if {@code other} overlaps with this 
window&mdash;{@code false} otherwise
+     * @throws IllegalArgumentException if the {@code other} window has a 
different type than {@link this} window
+     */
+    public boolean overlap(final Window other) throws IllegalArgumentException 
{
+        if (getClass() != other.getClass()) {
+            throw new IllegalArgumentException("Cannot compare windows of 
different type. Other window has type "
+                + other.getClass());
+        }
+        final SessionWindow otherWindow = (SessionWindow) other;
+        return !(otherWindow.end < start || end < otherWindow.start);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
index 5dfb9eb..630821f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -27,7 +27,12 @@ public class TimeWindow extends Window {
 
     @Override
     public boolean overlap(Window other) {
-        return getClass() == other.getClass() && super.overlap(other);
+        if (getClass() != other.getClass()) {
+            throw new IllegalArgumentException("Cannot compare windows of 
different type. Other window has type "
+                + other.getClass());
+        }
+        final TimeWindow otherWindow = (TimeWindow) other;
+        return start < otherWindow.end && otherWindow.start < end;
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index 4b93f9b..e9ec040 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -27,7 +27,11 @@ public class UnlimitedWindow extends Window {
 
     @Override
     public boolean overlap(Window other) {
-        return getClass() == other.getClass() && super.overlap(other);
+        if (getClass() != other.getClass()) {
+            throw new IllegalArgumentException("Cannot compare windows of 
different type. Other window has type "
+                + other.getClass());
+        }
+        return true;
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index b15eec9..604abb3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.List;
@@ -30,13 +30,13 @@ class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {
-        final Windowed<Bytes> sessionKey = new Windowed<>(key, new 
TimeWindow(to, Long.MAX_VALUE));
+        final Windowed<Bytes> sessionKey = new Windowed<>(key, new 
SessionWindow(to, Long.MAX_VALUE));
         return SessionKeySerde.toBinary(sessionKey, 
Serdes.Bytes().serializer());
     }
 
     @Override
     public Bytes lowerRange(final Bytes key, final long from) {
-        final Windowed<Bytes> sessionKey = new Windowed<>(key, new 
TimeWindow(0, Math.max(0, from)));
+        final Windowed<Bytes> sessionKey = new Windowed<>(key, new 
SessionWindow(0, Math.max(0, from)));
         return SessionKeySerde.toBinary(sessionKey, 
Serdes.Bytes().serializer());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 5cc2a59..0833f3c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -506,13 +506,13 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
-        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), 
equalTo(1L));
-        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, 
t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), 
equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), 
equalTo(1L));
-        assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, 
t2))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), 
equalTo(2L));
-        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, 
t3))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, 
t1))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, 
t1))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, 
t1))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, 
t4))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, 
t2))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, 
t4))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo(1L));
     }
 
     @Test
@@ -601,18 +601,18 @@ public class KStreamAggregationIntegrationTest {
                 = kafkaStreams.store(userSessionsStore, 
QueryableStoreTypes.<String, String>sessionStore());
 
         // verify correct data received
-        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), 
equalTo("start"));
-        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, 
t1))), equalTo("start"));
-        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), 
equalTo("pause"));
-        assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), 
equalTo("resume"));
-        assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, 
t2))), equalTo("pause:resume"));
-        assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), 
equalTo("pause:resume"));
-        assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, 
t3))), equalTo("stop"));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, 
t1))), equalTo("start"));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, 
t1))), equalTo("start"));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, 
t1))), equalTo("pause"));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, 
t4))), equalTo("resume"));
+        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, 
t2))), equalTo("pause:resume"));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, 
t4))), equalTo("pause:resume"));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo("stop"));
 
         // verify can query data via IQ
         final KeyValueIterator<Windowed<String>, String> bob = 
sessionStore.fetch("bob");
-        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new 
TimeWindow(t1, t1)), "start")));
-        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new 
TimeWindow(t3, t4)), "pause:resume")));
+        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new 
SessionWindow(t1, t1)), "start")));
+        assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new 
SessionWindow(t3, t4)), "pause:resume")));
         assertFalse(bob.hasNext());
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 62dd1d5..729e190 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -173,9 +173,9 @@ public class KGroupedStreamImplTest {
         driver.setTime(100);
         driver.process(TOPIC, "1", "1");
         driver.flushState();
-        assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new 
TimeWindow(10, 30))));
-        assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new 
TimeWindow(15, 15))));
-        assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new 
TimeWindow(70, 100))));
+        assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new 
SessionWindow(10, 30))));
+        assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new 
SessionWindow(15, 15))));
+        assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new 
SessionWindow(70, 100))));
     }
 
     @Test
@@ -202,9 +202,9 @@ public class KGroupedStreamImplTest {
         driver.setTime(100);
         driver.process(TOPIC, "1", "1");
         driver.flushState();
-        assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new 
TimeWindow(10, 30))));
-        assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new 
TimeWindow(15, 15))));
-        assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new 
TimeWindow(70, 100))));
+        assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new 
SessionWindow(10, 30))));
+        assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new 
SessionWindow(15, 15))));
+        assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new 
SessionWindow(70, 100))));
     }
 
     @Test
@@ -238,9 +238,9 @@ public class KGroupedStreamImplTest {
         driver.setTime(100);
         driver.process(TOPIC, "1", "C");
         driver.flushState();
-        assertEquals("A:B", results.get(new Windowed<>("1", new TimeWindow(10, 
30))));
-        assertEquals("Z", results.get(new Windowed<>("2", new TimeWindow(15, 
15))));
-        assertEquals("A:B:C", results.get(new Windowed<>("1", new 
TimeWindow(70, 100))));
+        assertEquals("A:B", results.get(new Windowed<>("1", new 
SessionWindow(10, 30))));
+        assertEquals("Z", results.get(new Windowed<>("2", new 
SessionWindow(15, 15))));
+        assertEquals("A:B:C", results.get(new Windowed<>("1", new 
SessionWindow(70, 100))));
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0ecaf3a..c3368a1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -184,9 +184,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         sessionStore.flush();
         assertEquals(Arrays.asList(
-                KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(0, 0)), 
new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(GAP_MS 
+ 1, GAP_MS + 1)), new Change<>(2L, null)),
-                KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(time, 
time)), new Change<>(3L, null))
+                KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(0, 
0)), new Change<>(1L, null)),
+                KeyValue.pair(new Windowed<>(sessionId, new 
SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)),
+                KeyValue.pair(new Windowed<>(sessionId, new 
SessionWindow(time, time)), new Change<>(3L, null))
 
         ), results);
 
@@ -200,14 +200,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         // first ensure it is in the store
         final KeyValueIterator<Windowed<String>, Long> a1 = 
sessionStore.findSessions("a", 0, 0);
-        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 
1L), a1.next());
+        assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 
0)), 1L), a1.next());
 
         context.setTime(100);
         processor.process("a", "2");
         // a1 from above should have been removed
         // should have merged session in store
         final KeyValueIterator<Windowed<String>, Long> a2 = 
sessionStore.findSessions("a", 0, 100);
-        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 
100)), 2L), a2.next());
+        assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 
100)), 2L), a2.next());
         assertFalse(a2.hasNext());
     }
 
@@ -229,13 +229,13 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         sessionStore.flush();
 
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
TimeWindow(0, 0)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("b", new 
TimeWindow(0, 0)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("c", new 
TimeWindow(0, 0)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("d", new 
TimeWindow(0, GAP_MS / 2)), new Change<>(2L, null)),
-                                   KeyValue.pair(new Windowed<>("b", new 
TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("a", new 
TimeWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)),
-                                   KeyValue.pair(new Windowed<>("c", new 
TimeWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, 
null))
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("b", new 
SessionWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("c", new 
SessionWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("d", new 
SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null)),
+                                   KeyValue.pair(new Windowed<>("b", new 
SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("a", new 
SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)),
+                                   KeyValue.pair(new Windowed<>("c", new 
SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new 
Change<>(1L, null))
                      ),
                      results);
     }
@@ -250,8 +250,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(GAP_MS + 1);
         processor.process("a", "1");
         processor.process("a", "2");
-        final long t0 = getter.get(new Windowed<>("a", new TimeWindow(0, 0)));
-        final long t1 = getter.get(new Windowed<>("a", new TimeWindow(GAP_MS + 
1, GAP_MS + 1)));
+        final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 
0)));
+        final long t1 = getter.get(new Windowed<>("a", new 
SessionWindow(GAP_MS + 1, GAP_MS + 1)));
         assertEquals(1L, t0);
         assertEquals(2L, t1);
     }
@@ -266,9 +266,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process("b", "1");
         processor.process("c", "1");
 
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
TimeWindow(0, 0)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("b", new 
TimeWindow(0, 0)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("c", new 
TimeWindow(0, 0)), new Change<>(1L, null))), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("b", new 
SessionWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("c", new 
SessionWindow(0, 0)), new Change<>(1L, null))), results);
     }
 
     @Test
@@ -280,9 +280,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process("a", "1");
         context.setTime(5);
         processor.process("a", "1");
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
TimeWindow(0, 0)), new Change<>(1L, null)),
-                                   KeyValue.pair(new Windowed<>("a", new 
TimeWindow(0, 0)), new Change<>(null, null)),
-                                   KeyValue.pair(new Windowed<>("a", new 
TimeWindow(0, 5)), new Change<>(2L, null))), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), new Change<>(null, null)),
+                                   KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 5)), new Change<>(2L, null))), results);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index 2f5972c..3a0f490 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -30,7 +30,7 @@ public class SessionKeySerdeTest {
 
     @Test
     public void shouldSerializeDeserialize() throws Exception {
-        final Windowed<Long> key = new Windowed<>(1L, new TimeWindow(10, 100));
+        final Windowed<Long> key = new Windowed<>(1L, new SessionWindow(10, 
100));
         final SessionKeySerde<Long> serde = new 
SessionKeySerde<>(Serdes.Long());
         final byte[] bytes = serde.serializer().serialize("t", key);
         final Windowed<Long> result = serde.deserializer().deserialize("t", 
bytes);
@@ -57,7 +57,7 @@ public class SessionKeySerdeTest {
 
     @Test
     public void shouldConvertToBinaryAndBack() throws Exception {
-        final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 
20));
+        final Windowed<String> key = new Windowed<>("key", new 
SessionWindow(10, 20));
         final Bytes serialized = SessionKeySerde.toBinary(key, 
Serdes.String().serializer());
         final Windowed<String> result = SessionKeySerde.from(serialized.get(), 
Serdes.String().deserializer());
         assertEquals(key, result);
@@ -65,21 +65,21 @@ public class SessionKeySerdeTest {
 
     @Test
     public void shouldExtractEndTimeFromBinary() throws Exception {
-        final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 
100));
+        final Windowed<String> key = new Windowed<>("key", new 
SessionWindow(10, 100));
         final Bytes serialized = SessionKeySerde.toBinary(key, 
Serdes.String().serializer());
         assertEquals(100, SessionKeySerde.extractEnd(serialized.get()));
     }
 
     @Test
     public void shouldExtractStartTimeFromBinary() throws Exception {
-        final Windowed<String> key = new Windowed<>("key", new TimeWindow(50, 
100));
+        final Windowed<String> key = new Windowed<>("key", new 
SessionWindow(50, 100));
         final Bytes serialized = SessionKeySerde.toBinary(key, 
Serdes.String().serializer());
         assertEquals(50, SessionKeySerde.extractStart(serialized.get()));
     }
 
     @Test
     public void shouldExtractKeyBytesFromBinary() throws Exception {
-        final Windowed<String> key = new Windowed<>("blah", new TimeWindow(50, 
100));
+        final Windowed<String> key = new Windowed<>("blah", new 
SessionWindow(50, 100));
         final Bytes serialized = SessionKeySerde.toBinary(key, 
Serdes.String().serializer());
         assertArrayEquals("blah".getBytes(), 
SessionKeySerde.extractKeyBytes(serialized.get()));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
new file mode 100644
index 0000000..2df7741
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SessionWindowTest {
+
+    private long start = 50;
+    private long end = 100;
+    private final SessionWindow window = new SessionWindow(start, end);
+    private final TimeWindow timeWindow = new TimeWindow(start, end);
+
+    @Test
+    public void shouldNotOverlapIfOtherWindowIsBeforeThisWindow() {
+        /*
+         * This:        [-------]
+         * Other: [---]
+         */
+        assertFalse(window.overlap(new SessionWindow(0, 25)));
+        assertFalse(window.overlap(new SessionWindow(0, start - 1)));
+        assertFalse(window.overlap(new SessionWindow(start - 1, start - 1)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowEndIsWithinThisWindow() {
+        /*
+         * This:        [-------]
+         * Other: [---------]
+         */
+        assertTrue(window.overlap(new SessionWindow(0, start)));
+        assertTrue(window.overlap(new SessionWindow(0, start + 1)));
+        assertTrue(window.overlap(new SessionWindow(0, 75)));
+        assertTrue(window.overlap(new SessionWindow(0, end - 1)));
+        assertTrue(window.overlap(new SessionWindow(0, end)));
+
+        assertTrue(window.overlap(new SessionWindow(start - 1, start)));
+        assertTrue(window.overlap(new SessionWindow(start - 1, start + 1)));
+        assertTrue(window.overlap(new SessionWindow(start - 1, 75)));
+        assertTrue(window.overlap(new SessionWindow(start - 1, end - 1)));
+        assertTrue(window.overlap(new SessionWindow(start - 1, end)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowContainsThisWindow() {
+        /*
+         * This:        [-------]
+         * Other: [------------------]
+         */
+        assertTrue(window.overlap(new SessionWindow(0, end)));
+        assertTrue(window.overlap(new SessionWindow(0, end + 1)));
+        assertTrue(window.overlap(new SessionWindow(0, 150)));
+
+        assertTrue(window.overlap(new SessionWindow(start - 1, end)));
+        assertTrue(window.overlap(new SessionWindow(start - 1, end + 1)));
+        assertTrue(window.overlap(new SessionWindow(start - 1, 150)));
+
+        assertTrue(window.overlap(new SessionWindow(start, end)));
+        assertTrue(window.overlap(new SessionWindow(start, end + 1)));
+        assertTrue(window.overlap(new SessionWindow(start, 150)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowIsWithinThisWindow() {
+        /*
+         * This:        [-------]
+         * Other:         [---]
+         */
+        assertTrue(window.overlap(new SessionWindow(start, start)));
+        assertTrue(window.overlap(new SessionWindow(start, 75)));
+        assertTrue(window.overlap(new SessionWindow(start, end)));
+        assertTrue(window.overlap(new SessionWindow(75, end)));
+        assertTrue(window.overlap(new SessionWindow(end, end)));
+    }
+
+    @Test
+    public void shouldOverlapIfOtherWindowStartIsWithinThisWindow() {
+        /*
+         * This:        [-------]
+         * Other:           [-------]
+         */
+        assertTrue(window.overlap(new SessionWindow(start, end + 1)));
+        assertTrue(window.overlap(new SessionWindow(start, 150)));
+        assertTrue(window.overlap(new SessionWindow(75, end + 1)));
+        assertTrue(window.overlap(new SessionWindow(75, 150)));
+        assertTrue(window.overlap(new SessionWindow(end, end + 1)));
+        assertTrue(window.overlap(new SessionWindow(end, 150)));
+    }
+
+    @Test
+    public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() {
+        /*
+         * This:        [-------]
+         * Other:                  [---]
+         */
+        assertFalse(window.overlap(new SessionWindow(end + 1, end + 1)));
+        assertFalse(window.overlap(new SessionWindow(end + 1, 150)));
+        assertFalse(window.overlap(new SessionWindow(125, 150)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void cannotCompareSessionWindowWithDifferentWindowType() {
+        window.overlap(timeWindow);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index a4e8df3..c603aa0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -70,15 +70,15 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldPutFetchFromCache() throws Exception {
-        cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L);
-        cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L);
-        cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
 
         final KeyValueIterator<Windowed<String>, Long> a = 
cachingStore.findSessions("a", 0, 0);
         final KeyValueIterator<Windowed<String>, Long> b = 
cachingStore.findSessions("b", 0, 0);
 
-        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 
1L), a.next());
-        assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 
1L), b.next());
+        assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 
0)), 1L), a.next());
+        assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 
0)), 1L), b.next());
         assertFalse(a.hasNext());
         assertFalse(b.hasNext());
         assertEquals(3, cache.size());
@@ -87,16 +87,16 @@ public class CachingSessionStoreTest {
     @Test
     public void shouldFetchAllSessionsWithSameRecordKey() throws Exception {
 
-        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
-                                                                               
     KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L),
-                                                                               
     KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L),
-                                                                               
     KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L));
+        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
+                                                                               
     KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
+                                                                               
     KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
+                                                                               
     KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));
         for (KeyValue<Windowed<String>, Long> kv : expected) {
             cachingStore.put(kv.key, kv.value);
         }
 
         // add one that shouldn't appear in the results
-        cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L);
+        cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);
 
 
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(cachingStore.fetch("a"));
@@ -124,8 +124,8 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldRemove() throws Exception {
-        final Windowed<String> a = new Windowed<>("a", new TimeWindow(0, 0));
-        final Windowed<String> b = new Windowed<>("b", new TimeWindow(0, 0));
+        final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 
0));
+        final Windowed<String> b = new Windowed<>("b", new SessionWindow(0, 
0));
         cachingStore.put(a, 2L);
         cachingStore.put(b, 2L);
         cachingStore.flush();
@@ -137,9 +137,9 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldFetchCorrectlyAcrossSegments() throws Exception {
-        final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0));
-        final Windowed<String> a2 = new Windowed<>("a", new 
TimeWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
-        final Windowed<String> a3 = new Windowed<>("a", new 
TimeWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 
2));
+        final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 
0));
+        final Windowed<String> a2 = new Windowed<>("a", new 
SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
+        final Windowed<String> a3 = new Windowed<>("a", new 
SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL 
* 2));
         cachingStore.put(a1, 1L);
         cachingStore.put(a2, 2L);
         cachingStore.put(a3, 3L);
@@ -153,7 +153,7 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldClearNamespaceCacheOnClose() throws Exception {
-        final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0));
+        final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 
0));
         cachingStore.put(a1, 1L);
         assertEquals(1, cache.size());
         cachingStore.close();
@@ -175,13 +175,13 @@ public class CachingSessionStoreTest {
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws 
Exception {
         cachingStore.close();
-        cachingStore.remove(new Windowed<>("a", new TimeWindow(0, 0)));
+        cachingStore.remove(new Windowed<>("a", new SessionWindow(0, 0)));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws 
Exception {
         cachingStore.close();
-        cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
     }
 
     private List<KeyValue<Windowed<String>, Long>> 
addSessionsUntilOverflow(final String...sessionIds) {
@@ -196,11 +196,11 @@ public class CachingSessionStoreTest {
 
     private void addSingleSession(final String sessionId, final 
List<KeyValue<Windowed<String>, Long>> allSessions) {
         final int timestamp = allSessions.size() * 10;
-        final Windowed<String> key = new Windowed<>(sessionId, new 
TimeWindow(timestamp, timestamp));
+        final Windowed<String> key = new Windowed<>(sessionId, new 
SessionWindow(timestamp, timestamp));
         final Long value = 1L;
         cachingStore.put(key, value);
         allSessions.add(KeyValue.pair(key, value));
     }
 
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 9ff2762..7fe490c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.test.MockProcessorContext;
@@ -79,13 +79,13 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldPutAndFetch() throws Exception {
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(10, 
10L))), serializeValue(10L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(500L, 
1000L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1500L, 
2000L))), serializeValue(100L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(2500L, 
3000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 
10L))), serializeValue(10L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(500L, 1000L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(1500L, 2000L))), serializeValue(100L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(2500L, 3000L))), serializeValue(200L));
 
-        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(10, 10)), 10L),
-                                                                               
     KeyValue.pair(new Windowed<>(key, new TimeWindow(500, 1000)), 50L));
+        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 
10L),
+                                                                               
     KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L));
 
         final KeyValueIterator<Bytes, byte[]> values = 
bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L);
         assertEquals(expected, toList(values));
@@ -94,18 +94,18 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldFindValuesWithinRange() throws Exception {
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 
0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1000L, 
1000L))), serializeValue(10L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(1000L, 1000L))), serializeValue(10L));
         final KeyValueIterator<Bytes, byte[]> results = 
bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L);
-        assertEquals(Collections.singletonList(KeyValue.pair(new 
Windowed<>(key, new TimeWindow(1000L, 1000L)), 10L)), toList(results));
+        assertEquals(Collections.singletonList(KeyValue.pair(new 
Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results));
     }
 
     @Test
     public void shouldRemove() throws Exception {
-        bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(0, 
1000))), serializeValue(30L));
-        bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(1500, 
2500))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 
1000))), serializeValue(30L));
+        bytesStore.put(serializeKey(new Windowed<>("a", new 
SessionWindow(1500, 2500))), serializeValue(50L));
 
-        bytesStore.remove(serializeKey(new Windowed<>("a", new TimeWindow(0, 
1000))));
+        bytesStore.remove(serializeKey(new Windowed<>("a", new 
SessionWindow(0, 1000))));
         final KeyValueIterator<Bytes, byte[]> value = 
bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L);
         assertFalse(value.hasNext());
     }
@@ -115,32 +115,32 @@ public class RocksDBSegmentedBytesStoreTest {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, 
numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 
0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
         assertEquals(Collections.singleton(segments.segmentName(0)), 
segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(30000L, 
60000L))), serializeValue(100L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(30000L, 60000L))), serializeValue(100L));
         assertEquals(Utils.mkSet(segments.segmentName(0),
                                  segments.segmentName(1)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(61000L, 
120000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(61000L, 120000L))), serializeValue(200L));
         assertEquals(Utils.mkSet(segments.segmentName(0),
                                  segments.segmentName(1),
                                  segments.segmentName(2)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
TimeWindow(121000L, 180000L))), serializeValue(300L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(121000L, 180000L))), serializeValue(300L));
         assertEquals(Utils.mkSet(segments.segmentName(1),
                                  segments.segmentName(2),
                                  segments.segmentName(3)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new 
TimeWindow(181000L, 240000L))), serializeValue(400L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new 
SessionWindow(181000L, 240000L))), serializeValue(400L));
         assertEquals(Utils.mkSet(segments.segmentName(2),
                                  segments.segmentName(3),
                                  segments.segmentName(4)), segmentDirs());
 
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new 
TimeWindow(61000L, 120000L)), 200L),
-                                   KeyValue.pair(new Windowed<>(key, new 
TimeWindow(121000L, 180000L)), 300L),
-                                   KeyValue.pair(new Windowed<>(key, new 
TimeWindow(181000L, 240000L)), 400L)
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new 
SessionWindow(61000L, 120000L)), 200L),
+                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(121000L, 180000L)), 300L),
+                                   KeyValue.pair(new Windowed<>(key, new 
SessionWindow(181000L, 240000L)), 400L)
                                                  ), results);
 
     }
@@ -170,4 +170,4 @@ public class RocksDBSegmentedBytesStoreTest {
         return results;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index ab4f5da..5a23a1c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -70,12 +70,12 @@ public class RocksDBSessionStoreTest {
     @Test
     public void shouldPutAndFindSessionsInRange() throws Exception {
         final String key = "a";
-        final Windowed<String> a1 = new Windowed<>(key, new TimeWindow(10, 
10L));
-        final Windowed<String> a2 = new Windowed<>(key, new TimeWindow(500L, 
1000L));
+        final Windowed<String> a1 = new Windowed<>(key, new SessionWindow(10, 
10L));
+        final Windowed<String> a2 = new Windowed<>(key, new 
SessionWindow(500L, 1000L));
         sessionStore.put(a1, 1L);
         sessionStore.put(a2, 2L);
-        sessionStore.put(new Windowed<>(key, new TimeWindow(1500L, 2000L)), 
1L);
-        sessionStore.put(new Windowed<>(key, new TimeWindow(2500L, 3000L)), 
2L);
+        sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 
1L);
+        sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 
2L);
 
         final List<KeyValue<Windowed<String>, Long>> expected
                 = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
@@ -87,16 +87,16 @@ public class RocksDBSessionStoreTest {
     @Test
     public void shouldFetchAllSessionsWithSameRecordKey() throws Exception {
 
-        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
-                                                                               
     KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L),
-                                                                               
     KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L),
-                                                                               
     KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L));
+        final List<KeyValue<Windowed<String>, Long>> expected = 
Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
+                                                                               
     KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
+                                                                               
     KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
+                                                                               
     KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));
         for (KeyValue<Windowed<String>, Long> kv : expected) {
             sessionStore.put(kv.key, kv.value);
         }
 
         // add one that shouldn't appear in the results
-        sessionStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);
 
         final List<KeyValue<Windowed<String>, Long>> results = 
toList(sessionStore.fetch("a"));
         assertEquals(expected, results);
@@ -107,22 +107,22 @@ public class RocksDBSessionStoreTest {
     @Test
     public void shouldFindValuesWithinMergingSessionWindowRange() throws 
Exception {
         final String key = "a";
-        sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L);
-        sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 
2L);
+        sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L);
+        sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 
2L);
         final KeyValueIterator<Windowed<String>, Long> results = 
sessionStore.findSessions(key, -1, 1000L);
 
         final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L),
-                KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 
1000L)), 2L));
+                KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 
1L),
+                KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 
1000L)), 2L));
         assertEquals(expected, toList(results));
     }
 
     @Test
     public void shouldRemove() throws Exception {
-        sessionStore.put(new Windowed<>("a", new TimeWindow(0, 1000)), 1L);
-        sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 
2L);
 
-        sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000)));
+        sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000)));
         assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext());
 
         assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext());
@@ -130,11 +130,11 @@ public class RocksDBSessionStoreTest {
 
     @Test
     public void shouldFindSessionsToMerge() throws Exception {
-        final Windowed<String> session1 = new Windowed<>("a", new 
TimeWindow(0, 100));
-        final Windowed<String> session2 = new Windowed<>("a", new 
TimeWindow(101, 200));
-        final Windowed<String> session3 = new Windowed<>("a", new 
TimeWindow(201, 300));
-        final Windowed<String> session4 = new Windowed<>("a", new 
TimeWindow(301, 400));
-        final Windowed<String> session5 = new Windowed<>("a", new 
TimeWindow(401, 500));
+        final Windowed<String> session1 = new Windowed<>("a", new 
SessionWindow(0, 100));
+        final Windowed<String> session2 = new Windowed<>("a", new 
SessionWindow(101, 200));
+        final Windowed<String> session3 = new Windowed<>("a", new 
SessionWindow(201, 300));
+        final Windowed<String> session4 = new Windowed<>("a", new 
SessionWindow(301, 400));
+        final Windowed<String> session5 = new Windowed<>("a", new 
SessionWindow(401, 500));
         sessionStore.put(session1, 1L);
         sessionStore.put(session2, 2L);
         sessionStore.put(session3, 3L);
@@ -155,4 +155,4 @@ public class RocksDBSessionStoreTest {
     }
 
 
-}
\ No newline at end of file
+}

Reply via email to