Repository: kafka Updated Branches: refs/heads/trunk 23dff4b04 -> 3c6051165
KAFKA-3452; Follow-up: Add SessionWindows - TimeWindows represent half-open time intervals while SessionWindows represent closed time intervals Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Damian Guy <damian....@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #2342 from mjsax/kafka-3452-session-window-follow-up Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c605116 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c605116 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c605116 Branch: refs/heads/trunk Commit: 3c60511655df3349323e394bdc3836300991352f Parents: 23dff4b Author: Matthias J. Sax <matth...@confluent.io> Authored: Wed Jan 11 20:33:42 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Jan 11 20:33:42 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/Window.java | 21 +++- .../KStreamSessionWindowAggregate.java | 12 +- .../kstream/internals/SessionKeySerde.java | 2 +- .../kstream/internals/SessionWindow.java | 58 +++++++++ .../streams/kstream/internals/TimeWindow.java | 7 +- .../kstream/internals/UnlimitedWindow.java | 6 +- .../state/internals/SessionKeySchema.java | 6 +- .../KStreamAggregationIntegrationTest.java | 34 ++--- .../internals/KGroupedStreamImplTest.java | 18 +-- ...reamSessionWindowAggregateProcessorTest.java | 40 +++--- .../kstream/internals/SessionKeySerdeTest.java | 10 +- .../kstream/internals/SessionWindowTest.java | 124 +++++++++++++++++++ .../internals/CachingSessionStoreTest.java | 42 +++---- .../RocksDBSegmentedBytesStoreTest.java | 44 +++---- .../internals/RocksDBSessionStoreTest.java | 46 +++---- 15 files changed, 335 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index 7d78d74..13a9529 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -22,16 +22,27 @@ package org.apache.kafka.streams.kstream; */ public abstract class Window { - private long start; - private long end; + protected final long start; + protected final long end; /** * Create a new window for the given start time (inclusive) and end time (exclusive). * * @param start the start timestamp of the window (inclusive) * @param end the end timestamp of the window (exclusive) + * @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than + * {@code start} */ - public Window(long start, long end) { + public Window(long start, long end) throws IllegalArgumentException { + if (start < 0) { + throw new IllegalArgumentException("Window start time cannot be negative."); + } + if (end < 0) { + throw new IllegalArgumentException("Window end time cannot be negative."); + } + if (end < start) { + throw new IllegalArgumentException("Window end time cannot be smaller than window start time."); + } this.start = start; this.end = end; } @@ -56,9 +67,7 @@ public abstract class Window { * @param other another window * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise */ - public boolean overlap(Window other) { - return this.start() < other.end() || other.start() < this.end(); - } + public abstract boolean overlap(Window other); @Override public boolean equals(Object obj) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index bb86f52..70b2b90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -88,8 +88,8 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl final long timestamp = context().timestamp(); final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>(); - final TimeWindow newTimeWindow = new TimeWindow(timestamp, timestamp); - TimeWindow mergedWindow = newTimeWindow; + final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); + SessionWindow mergedWindow = newSessionWindow; T agg = initializer.apply(); try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(), @@ -98,13 +98,13 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl final KeyValue<Windowed<K>, T> next = iterator.next(); merged.add(next); agg = sessionMerger.apply(key, agg, next.value); - mergedWindow = mergeTimeWindow(mergedWindow, (TimeWindow) next.key.window()); + mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window()); } } agg = aggregator.apply(key, value, agg); final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow); - if (!mergedWindow.equals(newTimeWindow)) { + if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue<Windowed<K>, T> session : merged) { store.remove(session.key); tupleForwarder.maybeForward(session.key, null, session.value); @@ -117,10 +117,10 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl } - private TimeWindow mergeTimeWindow(final TimeWindow one, final TimeWindow two) { + private SessionWindow mergeSessionWindow(final SessionWindow one, final SessionWindow two) { final long start = one.start() < two.start() ? one.start() : two.start(); final long end = one.end() > two.end() ? one.end() : two.end(); - return new TimeWindow(start, end); + return new SessionWindow(start, end); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 165d5c6..48213d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -131,7 +131,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); - return new Windowed<>(key, new TimeWindow(start, end)); + return new Windowed<>(key, new SessionWindow(start, end)); } private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java new file mode 100644 index 0000000..db63029 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Window; + +/** + * A session window covers a closed time interval with its start and end timestamp both being an inclusive boundary. + * + * @see TimeWindow + * @see UnlimitedWindow + * @see org.apache.kafka.streams.kstream.SessionWindows + */ +public final class SessionWindow extends Window { + + /** + * Create a new window for the given start time and end time (both inclusive). + * + * @param start the start timestamp of the window + * @param end the end timestamp of the window + */ + public SessionWindow(final long start, final long end) { + super(start, end); + } + + /** + * Check if the given window overlaps with this window. + * + * @param other another window + * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise + * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window + */ + public boolean overlap(final Window other) throws IllegalArgumentException { + if (getClass() != other.getClass()) { + throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + + other.getClass()); + } + final SessionWindow otherWindow = (SessionWindow) other; + return !(otherWindow.end < start || end < otherWindow.start); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java index 5dfb9eb..630821f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java @@ -27,7 +27,12 @@ public class TimeWindow extends Window { @Override public boolean overlap(Window other) { - return getClass() == other.getClass() && super.overlap(other); + if (getClass() != other.getClass()) { + throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + + other.getClass()); + } + final TimeWindow otherWindow = (TimeWindow) other; + return start < otherWindow.end && otherWindow.start < end; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java index 4b93f9b..e9ec040 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java @@ -27,7 +27,11 @@ public class UnlimitedWindow extends Window { @Override public boolean overlap(Window other) { - return getClass() == other.getClass() && super.overlap(other); + if (getClass() != other.getClass()) { + throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + + other.getClass()); + } + return true; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index b15eec9..604abb3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; import java.util.List; @@ -30,13 +30,13 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema { @Override public Bytes upperRange(final Bytes key, final long to) { - final Windowed<Bytes> sessionKey = new Windowed<>(key, new TimeWindow(to, Long.MAX_VALUE)); + final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); } @Override public Bytes lowerRange(final Bytes key, final long from) { - final Windowed<Bytes> sessionKey = new Windowed<>(key, new TimeWindow(0, Math.max(0, from))); + final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 5cc2a59..0833f3c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; @@ -506,13 +506,13 @@ public class KStreamAggregationIntegrationTest { startStreams(); latch.await(30, TimeUnit.SECONDS); - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo(1L)); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo(1L)); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo(1L)); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo(1L)); - assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo(2L)); - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo(2L)); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo(1L)); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L)); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(1L)); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L)); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L)); + assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(2L)); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L)); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(1L)); } @Test @@ -601,18 +601,18 @@ public class KStreamAggregationIntegrationTest { = kafkaStreams.store(userSessionsStore, QueryableStoreTypes.<String, String>sessionStore()); // verify correct data received - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo("start")); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo("start")); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo("pause")); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo("resume")); - assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo("pause:resume")); - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo("pause:resume")); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo("stop")); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start")); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo("start")); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo("pause")); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo("resume")); + assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo("pause:resume")); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo("pause:resume")); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo("stop")); // verify can query data via IQ final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob"); - assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t1, t1)), "start"))); - assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t3, t4)), "pause:resume"))); + assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start"))); + assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume"))); assertFalse(bob.hasNext()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 62dd1d5..729e190 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -173,9 +173,9 @@ public class KGroupedStreamImplTest { driver.setTime(100); driver.process(TOPIC, "1", "1"); driver.flushState(); - assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30)))); - assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15)))); - assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100)))); + assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30)))); + assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15)))); + assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100)))); } @Test @@ -202,9 +202,9 @@ public class KGroupedStreamImplTest { driver.setTime(100); driver.process(TOPIC, "1", "1"); driver.flushState(); - assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30)))); - assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15)))); - assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100)))); + assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30)))); + assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15)))); + assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100)))); } @Test @@ -238,9 +238,9 @@ public class KGroupedStreamImplTest { driver.setTime(100); driver.process(TOPIC, "1", "C"); driver.flushState(); - assertEquals("A:B", results.get(new Windowed<>("1", new TimeWindow(10, 30)))); - assertEquals("Z", results.get(new Windowed<>("2", new TimeWindow(15, 15)))); - assertEquals("A:B:C", results.get(new Windowed<>("1", new TimeWindow(70, 100)))); + assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30)))); + assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15)))); + assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100)))); } @Test(expected = NullPointerException.class) http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 0ecaf3a..c3368a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -184,9 +184,9 @@ public class KStreamSessionWindowAggregateProcessorTest { sessionStore.flush(); assertEquals(Arrays.asList( - KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(time, time)), new Change<>(3L, null)) + KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(time, time)), new Change<>(3L, null)) ), results); @@ -200,14 +200,14 @@ public class KStreamSessionWindowAggregateProcessorTest { // first ensure it is in the store final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessions("a", 0, 0); - assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a1.next()); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a1.next()); context.setTime(100); processor.process("a", "2"); // a1 from above should have been removed // should have merged session in store final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessions("a", 0, 100); - assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L), a2.next()); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 100)), 2L), a2.next()); assertFalse(a2.hasNext()); } @@ -229,13 +229,13 @@ public class KStreamSessionWindowAggregateProcessorTest { sessionStore.flush(); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("d", new TimeWindow(0, GAP_MS / 2)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>("b", new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>("c", new TimeWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null)) + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>("c", new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null)) ), results); } @@ -250,8 +250,8 @@ public class KStreamSessionWindowAggregateProcessorTest { context.setTime(GAP_MS + 1); processor.process("a", "1"); processor.process("a", "2"); - final long t0 = getter.get(new Windowed<>("a", new TimeWindow(0, 0))); - final long t1 = getter.get(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1))); + final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 0))); + final long t1 = getter.get(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1))); assertEquals(1L, t0); assertEquals(2L, t1); } @@ -266,9 +266,9 @@ public class KStreamSessionWindowAggregateProcessorTest { processor.process("b", "1"); processor.process("c", "1"); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null))), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null))), results); } @Test @@ -280,9 +280,9 @@ public class KStreamSessionWindowAggregateProcessorTest { processor.process("a", "1"); context.setTime(5); processor.process("a", "1"); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(null, null)), - KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 5)), new Change<>(2L, null))), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(null, null)), + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 5)), new Change<>(2L, null))), results); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java index 2f5972c..3a0f490 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java @@ -30,7 +30,7 @@ public class SessionKeySerdeTest { @Test public void shouldSerializeDeserialize() throws Exception { - final Windowed<Long> key = new Windowed<>(1L, new TimeWindow(10, 100)); + final Windowed<Long> key = new Windowed<>(1L, new SessionWindow(10, 100)); final SessionKeySerde<Long> serde = new SessionKeySerde<>(Serdes.Long()); final byte[] bytes = serde.serializer().serialize("t", key); final Windowed<Long> result = serde.deserializer().deserialize("t", bytes); @@ -57,7 +57,7 @@ public class SessionKeySerdeTest { @Test public void shouldConvertToBinaryAndBack() throws Exception { - final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 20)); + final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 20)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer()); assertEquals(key, result); @@ -65,21 +65,21 @@ public class SessionKeySerdeTest { @Test public void shouldExtractEndTimeFromBinary() throws Exception { - final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 100)); + final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 100)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); assertEquals(100, SessionKeySerde.extractEnd(serialized.get())); } @Test public void shouldExtractStartTimeFromBinary() throws Exception { - final Windowed<String> key = new Windowed<>("key", new TimeWindow(50, 100)); + final Windowed<String> key = new Windowed<>("key", new SessionWindow(50, 100)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); assertEquals(50, SessionKeySerde.extractStart(serialized.get())); } @Test public void shouldExtractKeyBytesFromBinary() throws Exception { - final Windowed<String> key = new Windowed<>("blah", new TimeWindow(50, 100)); + final Windowed<String> key = new Windowed<>("blah", new SessionWindow(50, 100)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java new file mode 100644 index 0000000..2df7741 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SessionWindowTest { + + private long start = 50; + private long end = 100; + private final SessionWindow window = new SessionWindow(start, end); + private final TimeWindow timeWindow = new TimeWindow(start, end); + + @Test + public void shouldNotOverlapIfOtherWindowIsBeforeThisWindow() { + /* + * This: [-------] + * Other: [---] + */ + assertFalse(window.overlap(new SessionWindow(0, 25))); + assertFalse(window.overlap(new SessionWindow(0, start - 1))); + assertFalse(window.overlap(new SessionWindow(start - 1, start - 1))); + } + + @Test + public void shouldOverlapIfOtherWindowEndIsWithinThisWindow() { + /* + * This: [-------] + * Other: [---------] + */ + assertTrue(window.overlap(new SessionWindow(0, start))); + assertTrue(window.overlap(new SessionWindow(0, start + 1))); + assertTrue(window.overlap(new SessionWindow(0, 75))); + assertTrue(window.overlap(new SessionWindow(0, end - 1))); + assertTrue(window.overlap(new SessionWindow(0, end))); + + assertTrue(window.overlap(new SessionWindow(start - 1, start))); + assertTrue(window.overlap(new SessionWindow(start - 1, start + 1))); + assertTrue(window.overlap(new SessionWindow(start - 1, 75))); + assertTrue(window.overlap(new SessionWindow(start - 1, end - 1))); + assertTrue(window.overlap(new SessionWindow(start - 1, end))); + } + + @Test + public void shouldOverlapIfOtherWindowContainsThisWindow() { + /* + * This: [-------] + * Other: [------------------] + */ + assertTrue(window.overlap(new SessionWindow(0, end))); + assertTrue(window.overlap(new SessionWindow(0, end + 1))); + assertTrue(window.overlap(new SessionWindow(0, 150))); + + assertTrue(window.overlap(new SessionWindow(start - 1, end))); + assertTrue(window.overlap(new SessionWindow(start - 1, end + 1))); + assertTrue(window.overlap(new SessionWindow(start - 1, 150))); + + assertTrue(window.overlap(new SessionWindow(start, end))); + assertTrue(window.overlap(new SessionWindow(start, end + 1))); + assertTrue(window.overlap(new SessionWindow(start, 150))); + } + + @Test + public void shouldOverlapIfOtherWindowIsWithinThisWindow() { + /* + * This: [-------] + * Other: [---] + */ + assertTrue(window.overlap(new SessionWindow(start, start))); + assertTrue(window.overlap(new SessionWindow(start, 75))); + assertTrue(window.overlap(new SessionWindow(start, end))); + assertTrue(window.overlap(new SessionWindow(75, end))); + assertTrue(window.overlap(new SessionWindow(end, end))); + } + + @Test + public void shouldOverlapIfOtherWindowStartIsWithinThisWindow() { + /* + * This: [-------] + * Other: [-------] + */ + assertTrue(window.overlap(new SessionWindow(start, end + 1))); + assertTrue(window.overlap(new SessionWindow(start, 150))); + assertTrue(window.overlap(new SessionWindow(75, end + 1))); + assertTrue(window.overlap(new SessionWindow(75, 150))); + assertTrue(window.overlap(new SessionWindow(end, end + 1))); + assertTrue(window.overlap(new SessionWindow(end, 150))); + } + + @Test + public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() { + /* + * This: [-------] + * Other: [---] + */ + assertFalse(window.overlap(new SessionWindow(end + 1, end + 1))); + assertFalse(window.overlap(new SessionWindow(end + 1, 150))); + assertFalse(window.overlap(new SessionWindow(125, 150))); + } + + @Test(expected = IllegalArgumentException.class) + public void cannotCompareSessionWindowWithDifferentWindowType() { + window.overlap(timeWindow); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index a4e8df3..c603aa0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -70,15 +70,15 @@ public class CachingSessionStoreTest { @Test public void shouldPutFetchFromCache() throws Exception { - cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); - cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L); - cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L); final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0); final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0); - assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a.next()); - assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L), b.next()); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a.next()); + assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next()); assertFalse(a.hasNext()); assertFalse(b.hasNext()); assertEquals(3, cache.size()); @@ -87,16 +87,16 @@ public class CachingSessionStoreTest { @Test public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { - final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L)); + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (KeyValue<Windowed<String>, Long> kv : expected) { cachingStore.put(kv.key, kv.value); } // add one that shouldn't appear in the results - cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L); + cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); final List<KeyValue<Windowed<String>, Long>> results = toList(cachingStore.fetch("a")); @@ -124,8 +124,8 @@ public class CachingSessionStoreTest { @Test public void shouldRemove() throws Exception { - final Windowed<String> a = new Windowed<>("a", new TimeWindow(0, 0)); - final Windowed<String> b = new Windowed<>("b", new TimeWindow(0, 0)); + final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed<String> b = new Windowed<>("b", new SessionWindow(0, 0)); cachingStore.put(a, 2L); cachingStore.put(b, 2L); cachingStore.flush(); @@ -137,9 +137,9 @@ public class CachingSessionStoreTest { @Test public void shouldFetchCorrectlyAcrossSegments() throws Exception { - final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0)); - final Windowed<String> a2 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL)); - final Windowed<String> a3 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); + final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL)); + final Windowed<String> a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); cachingStore.put(a1, 1L); cachingStore.put(a2, 2L); cachingStore.put(a3, 3L); @@ -153,7 +153,7 @@ public class CachingSessionStoreTest { @Test public void shouldClearNamespaceCacheOnClose() throws Exception { - final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0)); + final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0)); cachingStore.put(a1, 1L); assertEquals(1, cache.size()); cachingStore.close(); @@ -175,13 +175,13 @@ public class CachingSessionStoreTest { @Test(expected = InvalidStateStoreException.class) public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws Exception { cachingStore.close(); - cachingStore.remove(new Windowed<>("a", new TimeWindow(0, 0))); + cachingStore.remove(new Windowed<>("a", new SessionWindow(0, 0))); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws Exception { cachingStore.close(); - cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); } private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(final String...sessionIds) { @@ -196,11 +196,11 @@ public class CachingSessionStoreTest { private void addSingleSession(final String sessionId, final List<KeyValue<Windowed<String>, Long>> allSessions) { final int timestamp = allSessions.size() * 10; - final Windowed<String> key = new Windowed<>(sessionId, new TimeWindow(timestamp, timestamp)); + final Windowed<String> key = new Windowed<>(sessionId, new SessionWindow(timestamp, timestamp)); final Long value = 1L; cachingStore.put(key, value); allSessions.add(KeyValue.pair(key, value)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 9ff2762..7fe490c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.MockProcessorContext; @@ -79,13 +79,13 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldPutAndFetch() throws Exception { final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(10, 10L))), serializeValue(10L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(500L, 1000L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1500L, 2000L))), serializeValue(100L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(2500L, 3000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 10L))), serializeValue(10L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(500L, 1000L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1500L, 2000L))), serializeValue(100L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(2500L, 3000L))), serializeValue(200L)); - final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(10, 10)), 10L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(500, 1000)), 50L)); + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 10L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L)); final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L); assertEquals(expected, toList(values)); @@ -94,18 +94,18 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldFindValuesWithinRange() throws Exception { final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1000L, 1000L))), serializeValue(10L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1000L, 1000L))), serializeValue(10L)); final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L); - assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 10L)), toList(results)); + assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results)); } @Test public void shouldRemove() throws Exception { - bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000))), serializeValue(30L)); - bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(1500, 2500))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))), serializeValue(30L)); + bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500, 2500))), serializeValue(50L)); - bytesStore.remove(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000)))); + bytesStore.remove(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000)))); final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L); assertFalse(value.hasNext()); } @@ -115,32 +115,32 @@ public class RocksDBSegmentedBytesStoreTest { // just to validate directories final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(30000L, 60000L))), serializeValue(100L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(61000L, 120000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L)); assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1), segments.segmentName(2)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(121000L, 180000L))), serializeValue(300L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(121000L, 180000L))), serializeValue(300L)); assertEquals(Utils.mkSet(segments.segmentName(1), segments.segmentName(2), segments.segmentName(3)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(181000L, 240000L))), serializeValue(400L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(181000L, 240000L))), serializeValue(400L)); assertEquals(Utils.mkSet(segments.segmentName(2), segments.segmentName(3), segments.segmentName(4)), segmentDirs()); final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000)); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(61000L, 120000L)), 200L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(121000L, 180000L)), 300L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(181000L, 240000L)), 400L) + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(121000L, 180000L)), 300L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(181000L, 240000L)), 400L) ), results); } @@ -170,4 +170,4 @@ public class RocksDBSegmentedBytesStoreTest { return results; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3c605116/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index ab4f5da..5a23a1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -70,12 +70,12 @@ public class RocksDBSessionStoreTest { @Test public void shouldPutAndFindSessionsInRange() throws Exception { final String key = "a"; - final Windowed<String> a1 = new Windowed<>(key, new TimeWindow(10, 10L)); - final Windowed<String> a2 = new Windowed<>(key, new TimeWindow(500L, 1000L)); + final Windowed<String> a1 = new Windowed<>(key, new SessionWindow(10, 10L)); + final Windowed<String> a2 = new Windowed<>(key, new SessionWindow(500L, 1000L)); sessionStore.put(a1, 1L); sessionStore.put(a2, 2L); - sessionStore.put(new Windowed<>(key, new TimeWindow(1500L, 2000L)), 1L); - sessionStore.put(new Windowed<>(key, new TimeWindow(2500L, 3000L)), 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); @@ -87,16 +87,16 @@ public class RocksDBSessionStoreTest { @Test public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { - final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L)); + final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (KeyValue<Windowed<String>, Long> kv : expected) { sessionStore.put(kv.key, kv.value); } // add one that shouldn't appear in the results - sessionStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a")); assertEquals(expected, results); @@ -107,22 +107,22 @@ public class RocksDBSessionStoreTest { @Test public void shouldFindValuesWithinMergingSessionWindowRange() throws Exception { final String key = "a"; - sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L); - sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions(key, -1, 1000L); final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L)); + KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); assertEquals(expected, toList(results)); } @Test public void shouldRemove() throws Exception { - sessionStore.put(new Windowed<>("a", new TimeWindow(0, 1000)), 1L); - sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); + sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); - sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000))); + sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000))); assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext()); assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext()); @@ -130,11 +130,11 @@ public class RocksDBSessionStoreTest { @Test public void shouldFindSessionsToMerge() throws Exception { - final Windowed<String> session1 = new Windowed<>("a", new TimeWindow(0, 100)); - final Windowed<String> session2 = new Windowed<>("a", new TimeWindow(101, 200)); - final Windowed<String> session3 = new Windowed<>("a", new TimeWindow(201, 300)); - final Windowed<String> session4 = new Windowed<>("a", new TimeWindow(301, 400)); - final Windowed<String> session5 = new Windowed<>("a", new TimeWindow(401, 500)); + final Windowed<String> session1 = new Windowed<>("a", new SessionWindow(0, 100)); + final Windowed<String> session2 = new Windowed<>("a", new SessionWindow(101, 200)); + final Windowed<String> session3 = new Windowed<>("a", new SessionWindow(201, 300)); + final Windowed<String> session4 = new Windowed<>("a", new SessionWindow(301, 400)); + final Windowed<String> session5 = new Windowed<>("a", new SessionWindow(401, 500)); sessionStore.put(session1, 1L); sessionStore.put(session2, 2L); sessionStore.put(session3, 3L); @@ -155,4 +155,4 @@ public class RocksDBSessionStoreTest { } -} \ No newline at end of file +}