KAFKA-3613: Consolidate TumblingWindows and HoppingWindows into TimeWindows
This PR includes the same code as https://github.com/apache/kafka/pull/1261 but is rebased on latest trunk. Author: Michael G. Noll <[email protected]> Reviewers: Matthias J. Sax, Guozhang Wang Closes #1277 from miguno/KAFKA-3613-v2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68433dcf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68433dcf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68433dcf Branch: refs/heads/0.10.0 Commit: 68433dcfdc0ae078ee4e7d278c286a9b7c1b3e76 Parents: 0ada3b1 Author: Michael G. Noll <[email protected]> Authored: Fri Apr 29 07:44:03 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Apr 29 07:44:03 2016 -0700 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedDemo.java | 4 +- .../examples/pageview/PageViewUntypedDemo.java | 4 +- .../examples/wordcount/WordCountDemo.java | 2 +- .../kafka/streams/kstream/HoppingWindows.java | 95 ---- .../kafka/streams/kstream/JoinWindows.java | 27 +- .../kafka/streams/kstream/TimeWindows.java | 125 ++++++ .../kafka/streams/kstream/TumblingWindows.java | 74 ---- .../kafka/streams/kstream/UnlimitedWindows.java | 31 +- .../apache/kafka/streams/kstream/Window.java | 14 +- .../apache/kafka/streams/kstream/Windows.java | 6 +- .../kstream/internals/HoppingWindow.java | 37 -- .../streams/kstream/internals/TimeWindow.java | 33 ++ .../kstream/internals/TumblingWindow.java | 38 -- .../kstream/internals/UnlimitedWindow.java | 8 +- .../kafka/streams/kstream/TimeWindowsTest.java | 123 ++++++ .../streams/kstream/UnlimitedWindowsTest.java | 80 ++++ .../internals/KStreamWindowAggregateTest.java | 429 ++++++++++--------- .../WindowedStreamPartitionerTest.java | 4 +- .../streams/kstream/internals/WindowsTest.java | 70 --- .../streams/smoketest/SmokeTestClient.java | 4 +- 20 files changed, 644 insertions(+), 564 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 4124b32..39ec41f 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -24,11 +24,11 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.StreamsConfig; @@ -160,7 +160,7 @@ public class PageViewTypedDemo { return new KeyValue<>(viewRegion.region, viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) + .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index e61842f..9a41b9e 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -30,11 +30,11 @@ import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.Windowed; @@ -99,7 +99,7 @@ public class PageViewUntypedDemo { return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) + .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 5b52803..12395f9 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -69,7 +69,7 @@ public class WordCountDemo { }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, String value) { - return new KeyValue<String, String>(value, value); + return new KeyValue<>(value, value); } }) .countByKey("Counts"); http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java deleted file mode 100644 index aa866e4..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.HoppingWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * The hopping window specifications used for aggregations. - */ -public class HoppingWindows extends Windows<HoppingWindow> { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - public final long period; - - private HoppingWindows(String name, long size, long period) { - super(name); - - this.size = size; - this.period = period; - } - - /** - * Returns a half-interval hopping window definition with the window size in milliseconds - * of the form [ N * default_size, N * default_size + default_size ) - */ - public static HoppingWindows of(String name) { - return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS); - } - - /** - * Returns a new hopping window definition with the original size but reassign the window - * period in milliseconds of the form [ N * period, N * period + size ) - */ - public HoppingWindows with(long size) { - return new HoppingWindows(this.name, size, this.period); - } - - /** - * Returns a new hopping window definition with the original size but reassign the window - * period in milliseconds of the form [ N * period, N * period + size ) - */ - public HoppingWindows every(long period) { - return new HoppingWindows(this.name, this.size, period); - } - - @Override - public Map<Long, HoppingWindow> windowsFor(long timestamp) { - long enclosed = (size - 1) / period; - - long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period); - - Map<Long, HoppingWindow> windows = new HashMap<>(); - while (windowStart <= timestamp) { - // add the window - HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size); - windows.put(windowStart, window); - - // advance the step period - windowStart += this.period; - } - - return windows; - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(HoppingWindows.class)) - return false; - - HoppingWindows otherWindows = (HoppingWindows) other; - - return this.size == otherWindows.size && this.period == otherWindows.period; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 24dbdd3..a74984a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -17,15 +17,14 @@ package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.TumblingWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import java.util.Map; /** * The window specifications used for joins. */ -public class JoinWindows extends Windows<TumblingWindow> { +public class JoinWindows extends Windows<TimeWindow> { public final long before; public final long after; @@ -74,19 +73,29 @@ public class JoinWindows extends Windows<TumblingWindow> { } @Override - public Map<Long, TumblingWindow> windowsFor(long timestamp) { + public Map<Long, TimeWindow> windowsFor(long timestamp) { // this function should never be called throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows"); } @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(JoinWindows.class)) + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof JoinWindows)) { return false; + } - JoinWindows otherWindows = (JoinWindows) other; + JoinWindows other = (JoinWindows) o; + return this.before == other.before && this.after == other.after; + } - return this.before == otherWindows.before && this.after == otherWindows.after; + @Override + public int hashCode() { + int result = (int) (before ^ (before >>> 32)); + result = 31 * result + (int) (after ^ (after >>> 32)); + return result; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java new file mode 100644 index 0000000..fa3a9d8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.TimeWindow; + +import java.util.HashMap; +import java.util.Map; + +/** + * The time-based window specifications used for aggregations. + */ +public class TimeWindows extends Windows<TimeWindow> { + + /** + * The size of the window, i.e. how long a window lasts. + * The window size's effective time unit is determined by the semantics of the topology's + * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + */ + public final long size; + + /** + * The size of the window's advance interval, i.e. by how much a window moves forward relative + * to the previous one. The interval's effective time unit is determined by the semantics of + * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + */ + public final long advance; + + private TimeWindows(String name, long size, long advance) { + super(name); + if (size <= 0) { + throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")"); + } + this.size = size; + if (!(0 < advance && advance <= size)) { + throw new IllegalArgumentException( + String.format("advance interval (%d) must lie within interval (0, %d]", advance, size)); + } + this.advance = advance; + } + + /** + * Returns a window definition with the given window size, and with the advance interval being + * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th + * window. + * + * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, + * non-overlapping windows. Tumbling windows are a specialization of hopping windows. + * + * @param name The name of the window. Must not be null or empty. + * @param size The size of the window, with the requirement that size > 0. + * The window size's effective time unit is determined by the semantics of the + * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + * @return a new window definition + */ + public static TimeWindows of(String name, long size) { + return new TimeWindows(name, size, size); + } + + /** + * Returns a window definition with the original size, but advance ("hop") the window by the given + * interval, which specifies by how much a window moves forward relative to the previous one. + * Think: [N * advanceInterval, N * advanceInterval + size), with N denoting the N-th window. + * + * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows. + * + * @param interval The advance interval ("hop") of the window, with the requirement that + * 0 < interval ≤ size. The interval's effective time unit is + * determined by the semantics of the topology's configured + * {@link org.apache.kafka.streams.processor.TimestampExtractor}. + * @return a new window definition + */ + public TimeWindows advanceBy(long interval) { + return new TimeWindows(this.name, this.size, interval); + } + + @Override + public Map<Long, TimeWindow> windowsFor(long timestamp) { + long enclosed = (size - 1) / advance; + long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance); + + Map<Long, TimeWindow> windows = new HashMap<>(); + while (windowStart <= timestamp) { + TimeWindow window = new TimeWindow(windowStart, windowStart + this.size); + windows.put(windowStart, window); + windowStart += this.advance; + } + return windows; + } + + @Override + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof TimeWindows)) { + return false; + } + TimeWindows other = (TimeWindows) o; + return this.size == other.size && this.advance == other.advance; + } + + @Override + public int hashCode() { + int result = (int) (size ^ (size >>> 32)); + result = 31 * result + (int) (advance ^ (advance >>> 32)); + return result; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java deleted file mode 100644 index cadedba..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.TumblingWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * The tumbling window specifications used for aggregations. - */ -public class TumblingWindows extends Windows<TumblingWindow> { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - private TumblingWindows(String name, long size) { - super(name); - - this.size = size; - } - - /** - * Returns a half-interval sliding window definition with the default window size - */ - public static TumblingWindows of(String name) { - return new TumblingWindows(name, DEFAULT_SIZE_MS); - } - - /** - * Returns a half-interval sliding window definition with the window size in milliseconds - */ - public TumblingWindows with(long size) { - return new TumblingWindows(this.name, size); - } - - @Override - public Map<Long, TumblingWindow> windowsFor(long timestamp) { - long windowStart = timestamp - timestamp % size; - - // we cannot use Collections.singleMap since it does not support remove() call - Map<Long, TumblingWindow> windows = new HashMap<>(); - windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size)); - - return windows; - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(TumblingWindows.class)) - return false; - - TumblingWindows otherWindows = (TumblingWindows) other; - - return this.size == otherWindows.size; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 7cadfb4..bea3b57 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -34,6 +34,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { private UnlimitedWindows(String name, long start) { super(name); + if (start < 0) { + throw new IllegalArgumentException("start must be > 0 (you provided " + start + ")"); + } this.start = start; } @@ -52,21 +55,31 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { public Map<Long, UnlimitedWindow> windowsFor(long timestamp) { // always return the single unlimited window - // we cannot use Collections.singleMap since it does not support remove() call + // we cannot use Collections.singleMap since it does not support remove() Map<Long, UnlimitedWindow> windows = new HashMap<>(); - windows.put(start, new UnlimitedWindow(start)); - - + if (timestamp >= start) { + windows.put(start, new UnlimitedWindow(start)); + } return windows; } @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(UnlimitedWindows.class)) + public final boolean equals(Object o) { + if (o == this) { + return true; + } + + if (!(o instanceof UnlimitedWindows)) { return false; + } - UnlimitedWindows otherWindows = (UnlimitedWindows) other; + UnlimitedWindows other = (UnlimitedWindows) o; + return this.start == other.start; + } - return this.start == otherWindows.start; + @Override + public int hashCode() { + return (int) (start ^ (start >>> 32)); } -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index f2965dc..784d5c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -48,21 +48,18 @@ public abstract class Window { return this.start() < other.end() || other.start() < this.end(); } - public boolean equalsTo(Window other) { - return this.start() == other.start() && this.end() == other.end(); - } - @Override public boolean equals(Object obj) { - if (obj == this) + if (obj == this) { return true; + } - if (!(obj instanceof Window)) + if (getClass() != obj.getClass()) { return false; + } Window other = (Window) obj; - - return this.equalsTo(other) && this.start == other.start && this.end == other.end; + return this.start == other.start && this.end == other.end; } @Override @@ -70,4 +67,5 @@ public abstract class Window { long n = (this.start << 32) | this.end; return (int) (n % 0xFFFFFFFFL); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index e7dc23e..1406de6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -45,6 +45,9 @@ public abstract class Windows<W extends Window> { public int segments; protected Windows(String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("name must not be null or empty"); + } this.name = name; this.segments = DEFAULT_NUM_SEGMENTS; this.emitDurationMs = DEFAULT_EMIT_DURATION; @@ -95,7 +98,6 @@ public abstract class Windows<W extends Window> { return prefix + String.format("%010d", NAME_INDEX.getAndIncrement()); } - public abstract boolean equalTo(Windows other); - public abstract Map<Long, W> windowsFor(long timestamp); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java deleted file mode 100644 index 8b0b2fb..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Window; - -public class HoppingWindow extends Window { - - public HoppingWindow(long start, long end) { - super(start, end); - } - - @Override - public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(HoppingWindow.class); - } - - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java new file mode 100644 index 0000000..5dfb9eb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Window; + +public class TimeWindow extends Window { + + public TimeWindow(long start, long end) { + super(start, end); + } + + @Override + public boolean overlap(Window other) { + return getClass() == other.getClass() && super.overlap(other); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java deleted file mode 100644 index a02d4b9..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.streams.kstream.Window; - -public class TumblingWindow extends Window { - - public TumblingWindow(long start, long end) { - super(start, end); - } - - @Override - public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(TumblingWindow.class); - } - - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java index 8ac8f70..4b93f9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java @@ -27,11 +27,7 @@ public class UnlimitedWindow extends Window { @Override public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class); + return getClass() == other.getClass() && super.overlap(other); } - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class); - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java new file mode 100644 index 0000000..e9ff235 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TimeWindowsTest { + + private static String anyName = "window"; + private static long anySize = 123L; + + @Test + public void shouldHaveSaneEqualsAndHashCode() { + TimeWindows w1 = TimeWindows.of("w1", anySize); + TimeWindows w2 = TimeWindows.of("w2", w1.size); + + // Reflexive + assertTrue(w1.equals(w1)); + assertTrue(w1.hashCode() == w1.hashCode()); + + // Symmetric + assertTrue(w1.equals(w2)); + assertTrue(w1.hashCode() == w2.hashCode()); + assertTrue(w2.hashCode() == w1.hashCode()); + + // Transitive + TimeWindows w3 = TimeWindows.of("w3", w2.size); + assertTrue(w2.equals(w3)); + assertTrue(w2.hashCode() == w3.hashCode()); + assertTrue(w1.equals(w3)); + assertTrue(w1.hashCode() == w3.hashCode()); + + // Inequality scenarios + assertFalse("must be false for null", w1.equals(null)); + assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant"))); + assertFalse("must be false for different types", w1.equals(new Object())); + + TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1); + assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize)); + + TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1); + assertFalse("must be false when advance intervals are different", w1.equals(differentAdvanceInterval)); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeEmpty() { + TimeWindows.of("", anySize); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeNull() { + TimeWindows.of(null, anySize); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeNegative() { + TimeWindows.of(anyName, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeZero() { + TimeWindows.of(anyName, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void advanceIntervalMustNotBeNegative() { + TimeWindows.of(anyName, anySize).advanceBy(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void advanceIntervalMustNotBeZero() { + TimeWindows.of(anyName, anySize).advanceBy(0); + } + + @Test(expected = IllegalArgumentException.class) + public void advanceIntervalMustNotBeLargerThanWindowSize() { + long size = anySize; + TimeWindows.of(anyName, size).advanceBy(size + 1); + } + + @Test + public void windowsForHoppingWindows() { + TimeWindows windows = TimeWindows.of(anyName, 12L).advanceBy(5L); + Map<Long, TimeWindow> matched = windows.windowsFor(21L); + assertEquals(12L / 5L + 1, matched.size()); + assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); + assertEquals(new TimeWindow(15L, 27L), matched.get(15L)); + assertEquals(new TimeWindow(20L, 32L), matched.get(20L)); + } + + @Test + public void windowsForTumblingWindows() { + TimeWindows windows = TimeWindows.of(anyName, 12L); + Map<Long, TimeWindow> matched = windows.windowsFor(21L); + assertEquals(1, matched.size()); + assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java new file mode 100644 index 0000000..da5f159 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class UnlimitedWindowsTest { + + private static String anyName = "window"; + private static long anyStartTime = 10L; + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeEmpty() { + UnlimitedWindows.of(""); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeNull() { + UnlimitedWindows.of(null); + } + + @Test(expected = IllegalArgumentException.class) + public void startTimeMustNotBeNegative() { + UnlimitedWindows.of(anyName).startOn(-1); + } + + @Test + public void startTimeCanBeZero() { + UnlimitedWindows.of(anyName).startOn(0); + } + + @Test + public void shouldIncludeRecordsThatHappenedOnWindowStart() { + UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime); + Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start); + assertEquals(1, matchedWindows.size()); + assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime)); + } + + @Test + public void shouldIncludeRecordsThatHappenedAfterWindowStart() { + UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime); + long timestamp = w.start + 1; + Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp); + assertEquals(1, matchedWindows.size()); + assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime)); + } + + @Test + public void shouldExcludeRecordsThatHappenedBeforeWindowStart() { + UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime); + long timestamp = w.start - 1; + Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp); + assertTrue(matchedWindows.isEmpty()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 828103a..f4fe3a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -20,10 +20,10 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.KStreamTestDriver; @@ -37,6 +37,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import static org.junit.Assert.assertEquals; @@ -62,215 +63,229 @@ public class KStreamWindowAggregateTest { @Test public void testAggBasic() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - - KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); - KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerde, - strSerde); - - MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - driver = new KStreamTestDriver(builder, stateDir); - - driver.setTime(0L); - driver.process(topic1, "A", "1"); - driver.setTime(1L); - driver.process(topic1, "B", "2"); - driver.setTime(2L); - driver.process(topic1, "C", "3"); - driver.setTime(3L); - driver.process(topic1, "D", "4"); - driver.setTime(4L); - driver.process(topic1, "A", "1"); - - driver.setTime(5L); - driver.process(topic1, "A", "1"); - driver.setTime(6L); - driver.process(topic1, "B", "2"); - driver.setTime(7L); - driver.process(topic1, "D", "4"); - driver.setTime(8L); - driver.process(topic1, "B", "2"); - driver.setTime(9L); - driver.process(topic1, "C", "3"); - - driver.setTime(10L); - driver.process(topic1, "A", "1"); - driver.setTime(11L); - driver.process(topic1, "B", "2"); - driver.setTime(12L); - driver.process(topic1, "D", "4"); - driver.setTime(13L); - driver.process(topic1, "B", "2"); - driver.setTime(14L); - driver.process(topic1, "C", "3"); - - assertEquals(Utils.mkList( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1", - - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3", - - "[A@5]:0+1+1", "[A@10]:0+1", - "[B@5]:0+2+2+2", "[B@10]:0+2", - "[D@5]:0+4+4", "[D@10]:0+4", - "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", - "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + + KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); + KTable<Windowed<String>, String> table2 = + stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, + TimeWindows.of("topic1-Canonized", 10).advanceBy(5), + strSerde, + strSerde); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + driver.setTime(10L); + driver.process(topic1, "A", "1"); + driver.setTime(11L); + driver.process(topic1, "B", "2"); + driver.setTime(12L); + driver.process(topic1, "D", "4"); + driver.setTime(13L); + driver.process(topic1, "B", "2"); + driver.setTime(14L); + driver.process(topic1, "C", "3"); + + assertEquals(Utils.mkList( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1", + + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3", + + "[A@5]:0+1+1", "[A@10]:0+1", + "[B@5]:0+2+2+2", "[B@10]:0+2", + "[D@5]:0+4+4", "[D@10]:0+4", + "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", + "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); + + } finally { + Utils.delete(baseDir); + } } @Test public void testJoin() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - String topic2 = "topic2"; - - KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); - KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerde, - strSerde); - - MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); - table1.toStream().process(proc1); - - KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2); - KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - HoppingWindows.of("topic2-Canonized").with(10L).every(5L), - strSerde, - strSerde); - - MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - - MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); - table1.join(table2, new ValueJoiner<String, String, String>() { - @Override - public String apply(String p1, String p2) { - return p1 + "%" + p2; - } - }).toStream().process(proc3); - - driver = new KStreamTestDriver(builder, stateDir); - - driver.setTime(0L); - driver.process(topic1, "A", "1"); - driver.setTime(1L); - driver.process(topic1, "B", "2"); - driver.setTime(2L); - driver.process(topic1, "C", "3"); - driver.setTime(3L); - driver.process(topic1, "D", "4"); - driver.setTime(4L); - driver.process(topic1, "A", "1"); - - proc1.checkAndClearProcessResult( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1" - ); - proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", - "[B@0]:null", - "[C@0]:null", - "[D@0]:null", - "[A@0]:null" - ); - - driver.setTime(5L); - driver.process(topic1, "A", "1"); - driver.setTime(6L); - driver.process(topic1, "B", "2"); - driver.setTime(7L); - driver.process(topic1, "D", "4"); - driver.setTime(8L); - driver.process(topic1, "B", "2"); - driver.setTime(9L); - driver.process(topic1, "C", "3"); - - proc1.checkAndClearProcessResult( - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3" - ); - proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", "[A@5]:null", - "[B@0]:null", "[B@5]:null", - "[D@0]:null", "[D@5]:null", - "[B@0]:null", "[B@5]:null", - "[C@0]:null", "[C@5]:null" - ); - - driver.setTime(0L); - driver.process(topic2, "A", "a"); - driver.setTime(1L); - driver.process(topic2, "B", "b"); - driver.setTime(2L); - driver.process(topic2, "C", "c"); - driver.setTime(3L); - driver.process(topic2, "D", "d"); - driver.setTime(4L); - driver.process(topic2, "A", "a"); - - proc1.checkAndClearProcessResult(); - proc2.checkAndClearProcessResult( - "[A@0]:0+a", - "[B@0]:0+b", - "[C@0]:0+c", - "[D@0]:0+d", - "[A@0]:0+a+a" - ); - proc3.checkAndClearProcessResult( - "[A@0]:0+1+1+1%0+a", - "[B@0]:0+2+2+2%0+b", - "[C@0]:0+3+3%0+c", - "[D@0]:0+4+4%0+d", - "[A@0]:0+1+1+1%0+a+a"); - - driver.setTime(5L); - driver.process(topic2, "A", "a"); - driver.setTime(6L); - driver.process(topic2, "B", "b"); - driver.setTime(7L); - driver.process(topic2, "D", "d"); - driver.setTime(8L); - driver.process(topic2, "B", "b"); - driver.setTime(9L); - driver.process(topic2, "C", "c"); - - proc1.checkAndClearProcessResult(); - proc2.checkAndClearProcessResult( - "[A@0]:0+a+a+a", "[A@5]:0+a", - "[B@0]:0+b+b", "[B@5]:0+b", - "[D@0]:0+d+d", "[D@5]:0+d", - "[B@0]:0+b+b+b", "[B@5]:0+b+b", - "[C@0]:0+c+c", "[C@5]:0+c" - ); - proc3.checkAndClearProcessResult( - "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", - "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", - "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", - "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", - "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" - ); + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + String topic2 = "topic2"; + + KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); + KTable<Windowed<String>, String> table1 = + stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, + TimeWindows.of("topic1-Canonized", 10).advanceBy(5), + strSerde, + strSerde); + + MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); + table1.toStream().process(proc1); + + KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2); + KTable<Windowed<String>, String> table2 = + stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, + TimeWindows.of("topic2-Canonized", 10).advanceBy(5), + strSerde, + strSerde); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + + MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); + table1.join(table2, new ValueJoiner<String, String, String>() { + @Override + public String apply(String p1, String p2) { + return p1 + "%" + p2; + } + }).toStream().process(proc3); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + proc1.checkAndClearProcessResult( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1" + ); + proc2.checkAndClearProcessResult(); + proc3.checkAndClearProcessResult( + "[A@0]:null", + "[B@0]:null", + "[C@0]:null", + "[D@0]:null", + "[A@0]:null" + ); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + proc1.checkAndClearProcessResult( + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3" + ); + proc2.checkAndClearProcessResult(); + proc3.checkAndClearProcessResult( + "[A@0]:null", "[A@5]:null", + "[B@0]:null", "[B@5]:null", + "[D@0]:null", "[D@5]:null", + "[B@0]:null", "[B@5]:null", + "[C@0]:null", "[C@5]:null" + ); + + driver.setTime(0L); + driver.process(topic2, "A", "a"); + driver.setTime(1L); + driver.process(topic2, "B", "b"); + driver.setTime(2L); + driver.process(topic2, "C", "c"); + driver.setTime(3L); + driver.process(topic2, "D", "d"); + driver.setTime(4L); + driver.process(topic2, "A", "a"); + + proc1.checkAndClearProcessResult(); + proc2.checkAndClearProcessResult( + "[A@0]:0+a", + "[B@0]:0+b", + "[C@0]:0+c", + "[D@0]:0+d", + "[A@0]:0+a+a" + ); + proc3.checkAndClearProcessResult( + "[A@0]:0+1+1+1%0+a", + "[B@0]:0+2+2+2%0+b", + "[C@0]:0+3+3%0+c", + "[D@0]:0+4+4%0+d", + "[A@0]:0+1+1+1%0+a+a"); + + driver.setTime(5L); + driver.process(topic2, "A", "a"); + driver.setTime(6L); + driver.process(topic2, "B", "b"); + driver.setTime(7L); + driver.process(topic2, "D", "d"); + driver.setTime(8L); + driver.process(topic2, "B", "b"); + driver.setTime(9L); + driver.process(topic2, "C", "c"); + + proc1.checkAndClearProcessResult(); + proc2.checkAndClearProcessResult( + "[A@0]:0+a+a+a", "[A@5]:0+a", + "[B@0]:0+b+b", "[B@5]:0+b", + "[D@0]:0+d+d", "[D@5]:0+d", + "[B@0]:0+b+b+b", "[B@5]:0+b+b", + "[C@0]:0+c+c", "[C@5]:0+c" + ); + proc3.checkAndClearProcessResult( + "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", + "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", + "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", + "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", + "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" + ); + } finally { + Utils.delete(baseDir); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 7c6d5ec..b31b20d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -49,7 +49,7 @@ public class WindowedStreamPartitionerTest { new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); + private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet()); @Test public void testCopartitioning() { @@ -71,7 +71,7 @@ public class WindowedStreamPartitionerTest { Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); for (int w = 0; w < 10; w++) { - HoppingWindow window = new HoppingWindow(10 * w, 20 * w); + TimeWindow window = new TimeWindow(10 * w, 20 * w); Windowed<Integer> windowedKey = new Windowed<>(key, window); Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java deleted file mode 100644 index f9b6ba5..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.TumblingWindows; -import org.apache.kafka.streams.kstream.UnlimitedWindows; -import org.junit.Test; - -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class WindowsTest { - - @Test - public void hoppingWindows() { - - HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L); - - Map<Long, HoppingWindow> matched = windows.windowsFor(21L); - - assertEquals(3, matched.size()); - - assertEquals(new HoppingWindow(10L, 22L), matched.get(10L)); - assertEquals(new HoppingWindow(15L, 27L), matched.get(15L)); - assertEquals(new HoppingWindow(20L, 32L), matched.get(20L)); - } - - @Test - public void tumblineWindows() { - - TumblingWindows windows = TumblingWindows.of("test").with(12L); - - Map<Long, TumblingWindow> matched = windows.windowsFor(21L); - - assertEquals(1, matched.size()); - - assertEquals(new TumblingWindow(12L, 24L), matched.get(12L)); - } - - @Test - public void unlimitedWindows() { - - UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L); - - Map<Long, UnlimitedWindow> matched = windows.windowsFor(21L); - - assertEquals(1, matched.size()); - - assertEquals(new UnlimitedWindow(10L), matched.get(10L)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 95e0fbf..733c1ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.TumblingWindows; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.UnlimitedWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; @@ -207,7 +207,7 @@ public class SmokeTestClient extends SmokeTestUtil { // windowed count data.countByKey( - TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE), + TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE), stringSerde ).toStream().map( new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
