KAFKA-3104: add windowed aggregation to KStream Author: Guozhang Wang <[email protected]>
Reviewers: Yasuhiro Mastuda Closes #781 from guozhangwang/K3104 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62eb599 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62eb599 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62eb599 Branch: refs/heads/trunk Commit: a62eb5993f5517a64dd1020b0a9bbd1012f7ee67 Parents: cc3570d Author: Guozhang Wang <[email protected]> Authored: Mon Jan 18 12:14:43 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Jan 18 12:14:43 2016 -0800 ---------------------------------------------------------------------- .../kafka/streams/kstream/HoppingWindows.java | 23 +- .../kafka/streams/kstream/JoinWindows.java | 33 +-- .../apache/kafka/streams/kstream/KStream.java | 18 +- .../apache/kafka/streams/kstream/KTable.java | 18 +- .../kafka/streams/kstream/SlidingWindows.java | 67 ------ .../kafka/streams/kstream/TumblingWindows.java | 68 ++++++ .../kafka/streams/kstream/UnlimitedWindows.java | 8 +- .../apache/kafka/streams/kstream/Window.java | 19 ++ .../apache/kafka/streams/kstream/Windowed.java | 5 + .../apache/kafka/streams/kstream/Windows.java | 27 ++- .../kstream/internals/KStreamAggWindow.java | 51 ++++ .../kstream/internals/KStreamAggregate.java | 171 +++++++++++++ .../kstream/internals/KStreamFlatMap.java | 14 +- .../kstream/internals/KStreamFlatMapValues.java | 16 +- .../streams/kstream/internals/KStreamImpl.java | 49 ++-- .../kstream/internals/KStreamJoinWindow.java | 11 +- .../kstream/internals/KStreamKStreamJoin.java | 15 +- .../internals/KStreamKTableLeftJoin.java | 6 +- .../streams/kstream/internals/KStreamMap.java | 14 +- .../kstream/internals/KStreamMapValues.java | 14 +- .../kstream/internals/KStreamTransform.java | 8 +- .../internals/KTableKTableAbstractJoin.java | 6 +- .../kstream/internals/KTableKTableJoin.java | 20 +- .../kstream/internals/KTableKTableLeftJoin.java | 18 +- .../internals/KTableKTableOuterJoin.java | 20 +- .../internals/KTableKTableRightJoin.java | 18 +- .../kstream/internals/KTableMapValues.java | 38 +-- .../kstream/internals/KTableRepartitionMap.java | 38 +-- .../kstream/internals/SlidingWindow.java | 38 --- .../kstream/internals/TumblingWindow.java | 38 +++ .../kafka/streams/state/MeteredWindowStore.java | 18 +- .../kafka/streams/state/RocksDBStore.java | 1 + .../kafka/streams/state/RocksDBWindowStore.java | 44 ++-- .../state/RocksDBWindowStoreSupplier.java | 10 +- .../apache/kafka/streams/state/WindowStore.java | 6 +- .../streams/state/WindowStoreIterator.java | 4 +- .../kstream/internals/KStreamAggregateTest.java | 154 ++++++++++++ .../internals/KStreamKStreamJoinTest.java | 6 +- .../kstream/internals/KTableAggregateTest.java | 18 +- .../streams/kstream/internals/WindowsTest.java | 70 ++++++ .../streams/state/RocksDBWindowStoreTest.java | 239 +++++++++---------- 41 files changed, 1000 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java index d7141eb..f354ef9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java @@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.HoppingWindow; -import java.util.Collection; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; public class HoppingWindows extends Windows<HoppingWindow> { @@ -62,9 +62,22 @@ public class HoppingWindows extends Windows<HoppingWindow> { } @Override - public Collection<HoppingWindow> windowsFor(long timestamp) { - // TODO - return Collections.<HoppingWindow>emptyList(); + public Map<Long, HoppingWindow> windowsFor(long timestamp) { + long enclosed = (size - 1) / period; + + long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period); + + Map<Long, HoppingWindow> windows = new HashMap<>(); + while (windowStart <= timestamp) { + // add the window + HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size); + windows.put(windowStart, window); + + // advance the step period + windowStart += this.period; + } + + return windows; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 50aff9d..ffc1c1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -18,31 +18,27 @@ package org.apache.kafka.streams.kstream; -import org.apache.kafka.streams.kstream.internals.SlidingWindow; +import org.apache.kafka.streams.kstream.internals.TumblingWindow; -import java.util.Collection; +import java.util.Map; /** * This class is used to specify the behaviour of windowed joins. */ -public class JoinWindows extends Windows<SlidingWindow> { - - private static final int DEFAULT_NUM_SEGMENTS = 3; +public class JoinWindows extends Windows<TumblingWindow> { public final long before; public final long after; - public final int segments; - private JoinWindows(String name, long before, long after, int segments) { + private JoinWindows(String name, long before, long after) { super(name); this.after = after; this.before = before; - this.segments = segments; } public static JoinWindows of(String name) { - return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS); + return new JoinWindows(name, 0L, 0L); } /** @@ -53,7 +49,7 @@ public class JoinWindows extends Windows<SlidingWindow> { * @return */ public JoinWindows within(long timeDifference) { - return new JoinWindows(this.name, timeDifference, timeDifference, this.segments); + return new JoinWindows(this.name, timeDifference, timeDifference); } /** @@ -65,7 +61,7 @@ public class JoinWindows extends Windows<SlidingWindow> { * @return */ public JoinWindows before(long timeDifference) { - return new JoinWindows(this.name, timeDifference, this.after, this.segments); + return new JoinWindows(this.name, timeDifference, this.after); } /** @@ -77,22 +73,11 @@ public class JoinWindows extends Windows<SlidingWindow> { * @return */ public JoinWindows after(long timeDifference) { - return new JoinWindows(this.name, this.before, timeDifference, this.segments); - } - - /** - * Specifies the number of segments to be used for rolling the window store, - * this function is not exposed to users but can be called by developers that extend this JoinWindows specs - * - * @param segments - * @return - */ - protected JoinWindows segments(int segments) { - return new JoinWindows(name, before, after, segments); + return new JoinWindows(this.name, this.before, timeDifference); } @Override - public Collection<SlidingWindow> windowsFor(long timestamp) { + public Map<Long, TumblingWindow> windowsFor(long timestamp) { // this function should never be called throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index dace7e0..85d51e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -185,11 +185,11 @@ public interface KStream<K, V> { * @param otherValueDeserializer value deserializer for other stream, * if not specified the default serializer defined in the configs will be used * @param <V1> the value type of the other stream - * @param <V2> the value type of the new stream + * @param <R> the value type of the new stream */ - <V1, V2> KStream<K, V2> join( + <V1, R> KStream<K, R> join( KStream<K, V1> otherStream, - ValueJoiner<V, V1, V2> joiner, + ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serializer<K> keySerializer, Serializer<V> thisValueSerializer, @@ -217,11 +217,11 @@ public interface KStream<K, V> { * @param otherValueDeserializer value deserializer for other stream, * if not specified the default serializer defined in the configs will be used * @param <V1> the value type of the other stream - * @param <V2> the value type of the new stream + * @param <R> the value type of the new stream */ - <V1, V2> KStream<K, V2> outerJoin( + <V1, R> KStream<K, R> outerJoin( KStream<K, V1> otherStream, - ValueJoiner<V, V1, V2> joiner, + ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serializer<K> keySerializer, Serializer<V> thisValueSerializer, @@ -245,11 +245,11 @@ public interface KStream<K, V> { * @param otherValueDeserializer value deserializer for other stream, * if not specified the default serializer defined in the configs will be used * @param <V1> the value type of the other stream - * @param <V2> the value type of the new stream + * @param <R> the value type of the new stream */ - <V1, V2> KStream<K, V2> leftJoin( + <V1, R> KStream<K, R> leftJoin( KStream<K, V1> otherStream, - ValueJoiner<V, V1, V2> joiner, + ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serializer<K> keySerializer, Serializer<V1> otherValueSerializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 9837dae..93eceec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -111,10 +111,10 @@ public interface KTable<K, V> { * @param other the instance of KTable joined with this stream * @param joiner ValueJoiner * @param <V1> the value type of the other stream - * @param <V2> the value type of the new stream + * @param <R> the value type of the new stream * @return the instance of KTable */ - <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner); + <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); /** * Combines values of this KTable with another KTable using Outer Join. @@ -122,10 +122,10 @@ public interface KTable<K, V> { * @param other the instance of KTable joined with this stream * @param joiner ValueJoiner * @param <V1> the value type of the other stream - * @param <V2> the value type of the new stream + * @param <R> the value type of the new stream * @return the instance of KTable */ - <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner); + <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); /** * Combines values of this KTable with another KTable using Left Join. @@ -133,10 +133,10 @@ public interface KTable<K, V> { * @param other the instance of KTable joined with this stream * @param joiner ValueJoiner * @param <V1> the value type of the other stream - * @param <V2> the value type of the new stream + * @param <R> the value type of the new stream * @return the instance of KTable */ - <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner); + <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); /** * Aggregate values of this table by the selected key. @@ -148,14 +148,14 @@ public interface KTable<K, V> { * @param <V1> the value type of the aggregated table * @return the instance of KTable */ - <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier, + <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, - Serializer<V2> aggValueSerializer, + Serializer<T> aggValueSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valueDeserializer, - Deserializer<V2> aggValueDeserializer, + Deserializer<T> aggValueDeserializer, String name); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java deleted file mode 100644 index ffdb4ad..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - - -import org.apache.kafka.streams.kstream.internals.SlidingWindow; - -import java.util.Collection; -import java.util.Collections; - -public class SlidingWindows extends Windows<SlidingWindow> { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - private SlidingWindows(String name, long size) { - super(name); - - this.size = size; - } - - /** - * Returns a half-interval sliding window definition with the default window size - */ - public static SlidingWindows of(String name) { - return new SlidingWindows(name, DEFAULT_SIZE_MS); - } - - /** - * Returns a half-interval sliding window definition with the window size in milliseconds - */ - public SlidingWindows with(long size) { - return new SlidingWindows(this.name, size); - } - - @Override - public Collection<SlidingWindow> windowsFor(long timestamp) { - // TODO - return Collections.<SlidingWindow>emptyList(); - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(SlidingWindows.class)) - return false; - - SlidingWindows otherWindows = (SlidingWindows) other; - - return this.size == otherWindows.size; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java new file mode 100644 index 0000000..02ece3a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + + +import org.apache.kafka.streams.kstream.internals.TumblingWindow; + +import java.util.Collections; +import java.util.Map; + +public class TumblingWindows extends Windows<TumblingWindow> { + + private static final long DEFAULT_SIZE_MS = 1000L; + + public final long size; + + private TumblingWindows(String name, long size) { + super(name); + + this.size = size; + } + + /** + * Returns a half-interval sliding window definition with the default window size + */ + public static TumblingWindows of(String name) { + return new TumblingWindows(name, DEFAULT_SIZE_MS); + } + + /** + * Returns a half-interval sliding window definition with the window size in milliseconds + */ + public TumblingWindows with(long size) { + return new TumblingWindows(this.name, size); + } + + @Override + public Map<Long, TumblingWindow> windowsFor(long timestamp) { + long windowStart = timestamp - timestamp % size; + + return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size)); + } + + @Override + public boolean equalTo(Windows other) { + if (!other.getClass().equals(TumblingWindows.class)) + return false; + + TumblingWindows otherWindows = (TumblingWindows) other; + + return this.size == otherWindows.size; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 89cb0a8..6f47253 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; -import java.util.Collection; import java.util.Collections; +import java.util.Map; public class UnlimitedWindows extends Windows<UnlimitedWindow> { @@ -46,9 +46,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { } @Override - public Collection<UnlimitedWindow> windowsFor(long timestamp) { - // TODO - return Collections.<UnlimitedWindow>emptyList(); + public Map<Long, UnlimitedWindow> windowsFor(long timestamp) { + // always return the single unlimited window + return Collections.singletonMap(start, new UnlimitedWindow(start)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index 63e0a35..b9401b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -48,4 +48,23 @@ public abstract class Window { public boolean equalsTo(Window other) { return this.start() == other.start() && this.end() == other.end(); } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + + if (!(obj instanceof Window)) + return false; + + Window other = (Window) obj; + + return this.equalsTo(other) && this.start == other.start && this.end == other.end; + } + + @Override + public int hashCode() { + long n = (this.start << 32) | this.end; + return (int) (n % 0xFFFFFFFFL); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index 03fb656..10afc73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -35,4 +35,9 @@ public class Windowed<T> { public Window window() { return window; } + + @Override + public String toString() { + return "[" + value + "@" + window.start() + "]"; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index ab8d822..e4d7d9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -17,25 +17,31 @@ package org.apache.kafka.streams.kstream; -import java.util.Collection; + +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public abstract class Windows<W extends Window> { + private static final int DEFAULT_NUM_SEGMENTS = 3; + private static final long DEFAULT_EMIT_DURATION = 1000L; private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day private static final AtomicInteger NAME_INDEX = new AtomicInteger(0); + protected String name; + private long emitDuration; private long maintainDuration; - protected String name; + public int segments; protected Windows(String name) { this.name = name; + this.segments = DEFAULT_NUM_SEGMENTS; this.emitDuration = DEFAULT_EMIT_DURATION; this.maintainDuration = DEFAULT_MAINTAIN_DURATION; } @@ -62,6 +68,19 @@ public abstract class Windows<W extends Window> { return this; } + /** + * Specifies the number of segments to be used for rolling the window store, + * this function is not exposed to users but can be called by developers that extend this JoinWindows specs + * + * @param segments + * @return + */ + protected Windows segments(int segments) { + this.segments = segments; + + return this; + } + public long emitEveryMs() { return this.emitDuration; } @@ -74,7 +93,7 @@ public abstract class Windows<W extends Window> { return prefix + String.format("%010d", NAME_INDEX.getAndIncrement()); } - abstract boolean equalTo(Windows other); + public abstract boolean equalTo(Windows other); - abstract Collection<W> windowsFor(long timestamp); + public abstract Map<Long, W> windowsFor(long timestamp); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java new file mode 100644 index 0000000..f02f53a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public class KStreamAggWindow<K, V> implements ProcessorSupplier<K, V> { + + @Override + public Processor<K, V> get() { + return new KStreamAggWindowProcessor(); + } + + private class KStreamAggWindowProcessor extends AbstractProcessor<K, V> { + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + } + + @Override + public void process(K key, V value) { + // create a dummy window just for wrapping the timestamp + long timestamp = context().timestamp(); + + // send the new aggregate value + context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null)); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java new file mode 100644 index 0000000..5745a03 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.Iterator; +import java.util.Map; + +public class KStreamAggregate<K, V, T, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, T> { + + private final String storeName; + private final Windows<W> windows; + private final Aggregator<K, V, T> aggregator; + + private boolean sendOldValues = false; + + public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.aggregator = aggregator; + } + + @Override + public Processor<Windowed<K>, Change<V>> get() { + return new KStreamAggregateProcessor(); + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> { + + private WindowStore<K, T> windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + windowStore = (WindowStore<K, T>) context.getStateStore(storeName); + } + + @Override + public void process(Windowed<K> windowedKey, Change<V> change) { + // first get the matching windows + long timestamp = windowedKey.window().start(); + K key = windowedKey.value(); + V value = change.newValue; + + Map<Long, W> matchedWindows = windows.windowsFor(timestamp); + + long timeFrom = Long.MAX_VALUE; + long timeTo = Long.MIN_VALUE; + + // use range query on window store for efficient reads + for (long windowStartMs : matchedWindows.keySet()) { + timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; + timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; + } + + WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo); + + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue<Long, T> entry = iter.next(); + W window = matchedWindows.get(entry.key); + + if (window != null) { + + T oldAgg = entry.value; + + if (oldAgg == null) + oldAgg = aggregator.initialValue(); + + // try to add the new new value (there will never be old value) + T newAgg = aggregator.add(key, value, oldAgg); + + // update the store with the new value + windowStore.put(key, newAgg, window.start()); + + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + + matchedWindows.remove(entry.key); + } + } + + iter.close(); + + // create the new window for the rest of unmatched window that do not exist yet + for (long windowStartMs : matchedWindows.keySet()) { + T oldAgg = aggregator.initialValue(); + T newAgg = aggregator.add(key, value, oldAgg); + + windowStore.put(key, newAgg, windowStartMs); + + // send the new aggregate pair + if (sendOldValues) + context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null)); + } + } + } + + @Override + public KTableValueGetterSupplier<Windowed<K>, T> view() { + + return new KTableValueGetterSupplier<Windowed<K>, T>() { + + public KTableValueGetter<Windowed<K>, T> get() { + return new KStreamAggregateValueGetter(); + } + + }; + } + + private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> { + + private WindowStore<K, T> windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + windowStore = (WindowStore<K, T>) context.getStateStore(storeName); + } + + @SuppressWarnings("unchecked") + @Override + public T get(Windowed<K> windowedKey) { + K key = windowedKey.value(); + W window = (W) windowedKey.window(); + + // this iterator should only contain one element + Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start()); + + return iter.next().value; + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index 175a002..daef8b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -23,23 +23,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> { +class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> { - private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper; + private final KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper; - KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) { + KStreamFlatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) { this.mapper = mapper; } @Override - public Processor<K1, V1> get() { + public Processor<K, V> get() { return new KStreamFlatMapProcessor(); } - private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> { + private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> { @Override - public void process(K1 key, V1 value) { - for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) { + public void process(K key, V value) { + for (KeyValue<K1, V1> newPair : mapper.apply(key, value)) { context().forward(newPair.key, newPair.value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index 9b4559b..97d6b7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -22,24 +22,24 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> { +class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> { - private final ValueMapper<V1, ? extends Iterable<V2>> mapper; + private final ValueMapper<V, ? extends Iterable<V1>> mapper; - KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) { + KStreamFlatMapValues(ValueMapper<V, ? extends Iterable<V1>> mapper) { this.mapper = mapper; } @Override - public Processor<K1, V1> get() { + public Processor<K, V> get() { return new KStreamFlatMapValuesProcessor(); } - private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> { + private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> { @Override - public void process(K1 key, V1 value) { - Iterable<V2> newValues = mapper.apply(value); - for (V2 v : newValues) { + public void process(K key, V value) { + Iterable<V1> newValues = mapper.apply(value); + for (V1 v : newValues) { context().forward(key, v); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 2459f0d..7b634dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -305,28 +305,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V RocksDBWindowStoreSupplier<K, V> thisWindow = new RocksDBWindowStoreSupplier<>( windows.name() + "-this", - windows.before, - windows.after, windows.maintainMs(), windows.segments, + true, new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer), null); RocksDBWindowStoreSupplier<K, V1> otherWindow = new RocksDBWindowStoreSupplier<>( windows.name() + "-other", - windows.before, - windows.after, windows.maintainMs(), windows.segments, + true, new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer), null); - KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name()); - KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name()); + KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs()); + KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs()); + + KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer); + KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer); - KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer); - KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer); KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>(); String thisWindowStreamName = topology.newName(WINDOWED_NAME); @@ -362,15 +361,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V RocksDBWindowStoreSupplier<K, V1> otherWindow = new RocksDBWindowStoreSupplier<>( windows.name() + "-this", - windows.before, - windows.after, windows.maintainMs(), windows.segments, + true, new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer), null); - KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name()); - KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true); + KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs()); + KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true); String otherWindowStreamName = topology.newName(WINDOWED_NAME); String joinThisName = topology.newName(LEFTJOIN_NAME); @@ -401,8 +399,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Serializer<T> aggValueSerializer, Deserializer<K> keyDeserializer, Deserializer<T> aggValueDeserializer) { - // TODO - return null; + + // TODO: this agg window operator is only used for casting K to Windowed<K> for + // KTableProcessorSupplier, which is a bit awkward and better be removed in the future + String aggregateName = topology.newName(AGGREGATE_NAME); + String aggWindowName = topology.newName(WINDOWED_NAME); + + ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>(); + ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get()); + + RocksDBWindowStoreSupplier<K, T> aggregateStore = + new RocksDBWindowStoreSupplier<>( + windows.name(), + windows.maintainMs(), + windows.segments, + false, + new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer), + null); + + // aggregate the values with the aggregator and local store + topology.addProcessor(aggWindowName, aggWindowSupplier, this.name); + topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName); + topology.addStateStore(aggregateStore, aggregateName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java index b122aa1..4f427d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -26,9 +27,14 @@ import org.apache.kafka.streams.state.WindowStore; class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { private final String windowName; + private final long windowSizeMs; + private final long retentionPeriodMs; - KStreamJoinWindow(String windowName) { + + KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) { this.windowName = windowName; + this.windowSizeMs = windowSizeMs; + this.retentionPeriodMs = retentionPeriodMs; } @Override @@ -46,6 +52,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { super.init(context); window = (WindowStore<K, V>) context.getStateStore(windowName); + + if (windowSizeMs * 2 > retentionPeriodMs) + throw new KafkaException("The retention period must be at least two times the join window size."); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 8a9bf6c..01e3325 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; @@ -29,11 +30,16 @@ import java.util.Iterator; class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { private final String otherWindowName; + private final long joinBeforeMs; + private final long joinAfterMs; + private final ValueJoiner<V1, V2, R> joiner; private final boolean outer; - KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) { + KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<V1, V2, R> joiner, boolean outer) { this.otherWindowName = otherWindowName; + this.joinBeforeMs = joinBeforeMs; + this.joinAfterMs = joinAfterMs; this.joiner = joiner; this.outer = outer; } @@ -59,10 +65,13 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { public void process(K key, V1 value) { boolean needOuterJoin = KStreamKStreamJoin.this.outer; - Iterator<V2> iter = otherWindow.fetch(key, context().timestamp()); + long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); + long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); + + Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo); while (iter.hasNext()) { needOuterJoin = false; - context().forward(key, joiner.apply(value, iter.next())); + context().forward(key, joiner.apply(value, iter.next().value)); } if (needOuterJoin) http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java index 51a6277..dfca019 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java @@ -23,12 +23,12 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamKTableLeftJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> { +class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { private final KTableValueGetterSupplier<K, V2> valueGetterSupplier; - private final ValueJoiner<V1, V2, V> joiner; + private final ValueJoiner<V1, V2, R> joiner; - KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, V> joiner) { + KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) { this.valueGetterSupplier = table.valueGetterSupplier(); this.joiner = joiner; } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index 3868318..57f1431 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -23,23 +23,23 @@ import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> { +class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> { - private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper; + private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper; - public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) { + public KStreamMap(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) { this.mapper = mapper; } @Override - public Processor<K1, V1> get() { + public Processor<K, V> get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor<K1, V1> { + private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override - public void process(K1 key, V1 value) { - KeyValue<K2, V2> newPair = mapper.apply(key, value); + public void process(K key, V value) { + KeyValue<K1, V1> newPair = mapper.apply(key, value); context().forward(newPair.key, newPair.value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index 692b421..06667e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -22,23 +22,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> { +class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { - private final ValueMapper<V1, V2> mapper; + private final ValueMapper<V, V1> mapper; - public KStreamMapValues(ValueMapper<V1, V2> mapper) { + public KStreamMapValues(ValueMapper<V, V1> mapper) { this.mapper = mapper; } @Override - public Processor<K1, V1> get() { + public Processor<K, V> get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor<K1, V1> { + private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override - public void process(K1 key, V1 value) { - V2 newValue = mapper.apply(value); + public void process(K key, V value) { + V1 newValue = mapper.apply(value); context().forward(key, newValue); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java index 7ebab0e..a9d8f97 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java @@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> { +public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> { - private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier; + private final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier; - public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) { + public KStreamTransform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) { this.transformerSupplier = transformerSupplier; } @Override - public Processor<K1, V1> get() { + public Processor<K, V> get() { return new KStreamTransformProcessor(transformerSupplier.get()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java index ad987dd..5e441aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java @@ -19,19 +19,19 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueJoiner; -abstract class KTableKTableAbstractJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> { +abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K, V1, R> { protected final KTableImpl<K, ?, V1> table1; protected final KTableImpl<K, ?, V2> table2; protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1; protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2; - protected final ValueJoiner<V1, V2, V> joiner; + protected final ValueJoiner<V1, V2, R> joiner; protected boolean sendOldValues = false; KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, - ValueJoiner<V1, V2, V> joiner) { + ValueJoiner<V1, V2, R> joiner) { this.table1 = table1; this.table2 = table2; this.valueGetterSupplier1 = table1.valueGetterSupplier(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java index 9716edd..6eb27b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java @@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> { +class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> { - KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) { + KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) { super(table1, table2, joiner); } @@ -34,10 +34,10 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, } @Override - public KTableValueGetterSupplier<K, V> view() { - return new KTableValueGetterSupplier<K, V>() { + public KTableValueGetterSupplier<K, R> view() { + return new KTableValueGetterSupplier<K, R>() { - public KTableValueGetter<K, V> get() { + public KTableValueGetter<K, R> get() { return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); } @@ -61,8 +61,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, @Override public void process(K key, Change<V1> change) { - V newValue = null; - V oldValue = null; + R newValue = null; + R oldValue = null; V2 value2 = null; if (change.newValue != null || change.oldValue != null) @@ -78,7 +78,7 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, } } - private class KTableKTableJoinValueGetter implements KTableValueGetter<K, V> { + private class KTableKTableJoinValueGetter implements KTableValueGetter<K, R> { private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V2> valueGetter2; @@ -95,8 +95,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, } @Override - public V get(K key) { - V newValue = null; + public R get(K key) { + R newValue = null; V1 value1 = valueGetter1.get(key); if (value1 != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index b10bdb5..00e872e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> { +class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> { - KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) { + KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) { super(table1, table2, joiner); } @@ -34,10 +34,10 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } @Override - public KTableValueGetterSupplier<K, V> view() { - return new KTableValueGetterSupplier<K, V>() { + public KTableValueGetterSupplier<K, R> view() { + return new KTableValueGetterSupplier<K, R>() { - public KTableValueGetter<K, V> get() { + public KTableValueGetter<K, R> get() { return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); } @@ -61,8 +61,8 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, @Override public void process(K key, Change<V1> change) { - V newValue = null; - V oldValue = null; + R newValue = null; + R oldValue = null; V2 value2 = null; if (change.newValue != null || change.oldValue != null) @@ -79,7 +79,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } - private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, V> { + private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> { private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V2> valueGetter2; @@ -96,7 +96,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } @Override - public V get(K key) { + public R get(K key) { V1 value1 = valueGetter1.get(key); if (value1 != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index b859b34..6ab0ae9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> { +class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> { - KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) { + KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) { super(table1, table2, joiner); } @@ -34,10 +34,10 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } @Override - public KTableValueGetterSupplier<K, V> view() { - return new KTableValueGetterSupplier<K, V>() { + public KTableValueGetterSupplier<K, R> view() { + return new KTableValueGetterSupplier<K, R>() { - public KTableValueGetter<K, V> get() { + public KTableValueGetter<K, R> get() { return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); } @@ -61,8 +61,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, @Override public void process(K key, Change<V1> change) { - V newValue = null; - V oldValue = null; + R newValue = null; + R oldValue = null; V2 value2 = valueGetter.get(key); if (change.newValue != null || value2 != null) @@ -77,7 +77,7 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } } - private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, V> { + private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> { private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V2> valueGetter2; @@ -94,8 +94,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } @Override - public V get(K key) { - V newValue = null; + public R get(K key) { + R newValue = null; V1 value1 = valueGetter1.get(key); V2 value2 = valueGetter2.get(key); http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index f20e987..a6a13fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -22,10 +22,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> { +class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> { - KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) { + KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) { super(table1, table2, joiner); } @@ -35,10 +35,10 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } @Override - public KTableValueGetterSupplier<K, V> view() { - return new KTableValueGetterSupplier<K, V>() { + public KTableValueGetterSupplier<K, R> view() { + return new KTableValueGetterSupplier<K, R>() { - public KTableValueGetter<K, V> get() { + public KTableValueGetter<K, R> get() { return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); } @@ -62,8 +62,8 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, @Override public void process(K key, Change<V1> change) { - V newValue = null; - V oldValue = null; + R newValue = null; + R oldValue = null; V2 value2 = valueGetter.get(key); if (value2 != null) { @@ -77,7 +77,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } - private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, V> { + private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R> { private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V2> valueGetter2; @@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, } @Override - public V get(K key) { + public R get(K key) { V2 value2 = valueGetter2.get(key); if (value2 != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index c664906..244d8ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -23,30 +23,30 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> { +class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { - private final KTableImpl<K1, ?, V1> parent; - private final ValueMapper<V1, V2> mapper; + private final KTableImpl<K, ?, V> parent; + private final ValueMapper<V, V1> mapper; private boolean sendOldValues = false; - public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) { + public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<V, V1> mapper) { this.parent = parent; this.mapper = mapper; } @Override - public Processor<K1, Change<V1>> get() { + public Processor<K, Change<V>> get() { return new KTableMapValuesProcessor(); } @Override - public KTableValueGetterSupplier<K1, V2> view() { - final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier(); + public KTableValueGetterSupplier<K, V1> view() { + final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); - return new KTableValueGetterSupplier<K1, V2>() { + return new KTableValueGetterSupplier<K, V1>() { - public KTableValueGetter<K1, V2> get() { + public KTableValueGetter<K, V1> get() { return new KTableMapValuesValueGetter(parentValueGetterSupplier.get()); } @@ -59,8 +59,8 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> sendOldValues = true; } - private V2 computeValue(V1 value) { - V2 newValue = null; + private V1 computeValue(V value) { + V1 newValue = null; if (value != null) newValue = mapper.apply(value); @@ -68,22 +68,22 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> return newValue; } - private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> { + private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> { @Override - public void process(K1 key, Change<V1> change) { - V2 newValue = computeValue(change.newValue); - V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null; + public void process(K key, Change<V> change) { + V1 newValue = computeValue(change.newValue); + V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null; context().forward(key, new Change<>(newValue, oldValue)); } } - private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> { + private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> { - private final KTableValueGetter<K1, V1> parentGetter; + private final KTableValueGetter<K, V> parentGetter; - public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) { + public KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) { this.parentGetter = parentGetter; } @@ -93,7 +93,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> } @Override - public V2 get(K1 key) { + public V1 get(K key) { return computeValue(parentGetter.get(key)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index bbef7fb..12fcc17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -29,28 +29,28 @@ import org.apache.kafka.streams.processor.ProcessorContext; * * Given the input, it can output at most two records (one mapped from old value and one mapped from new value). */ -public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> { +public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> { - private final KTableImpl<K1, ?, V1> parent; - private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper; + private final KTableImpl<K, ?, V> parent; + private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper; - public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) { + public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) { this.parent = parent; this.mapper = mapper; } @Override - public Processor<K1, Change<V1>> get() { + public Processor<K, Change<V>> get() { return new KTableMapProcessor(); } @Override - public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() { - final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier(); + public KTableValueGetterSupplier<K, KeyValue<K1, V1>> view() { + final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); - return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() { + return new KTableValueGetterSupplier<K, KeyValue<K1, V1>>() { - public KTableValueGetter<K1, KeyValue<K2, V2>> get() { + public KTableValueGetter<K, KeyValue<K1, V1>> get() { return new KTableMapValueGetter(parentValueGetterSupplier.get()); } @@ -63,8 +63,8 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp throw new KafkaException("KTableRepartitionMap should always require sending old values."); } - private KeyValue<K2, V2> computeValue(K1 key, V1 value) { - KeyValue<K2, V2> newValue = null; + private KeyValue<K1, V1> computeValue(K key, V value) { + KeyValue<K1, V1> newValue = null; if (key != null || value != null) newValue = mapper.apply(key, value); @@ -72,26 +72,26 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp return newValue; } - private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> { + private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> { @Override - public void process(K1 key, Change<V1> change) { - KeyValue<K2, V2> newPair = computeValue(key, change.newValue); + public void process(K key, Change<V> change) { + KeyValue<K1, V1> newPair = computeValue(key, change.newValue); context().forward(newPair.key, new Change<>(newPair.value, null)); if (change.oldValue != null) { - KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue); + KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue); context().forward(oldPair.key, new Change<>(null, oldPair.value)); } } } - private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> { + private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> { - private final KTableValueGetter<K1, V1> parentGetter; + private final KTableValueGetter<K, V> parentGetter; - public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) { + public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) { this.parentGetter = parentGetter; } @@ -101,7 +101,7 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp } @Override - public KeyValue<K2, V2> get(K1 key) { + public KeyValue<K1, V1> get(K key) { return computeValue(key, parentGetter.get(key)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java deleted file mode 100644 index a6b5149..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.streams.kstream.Window; - -public class SlidingWindow extends Window { - - public SlidingWindow(long start, long end) { - super(start, end); - } - - @Override - public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(SlidingWindow.class); - } - - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java new file mode 100644 index 0000000..a02d4b9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + + +import org.apache.kafka.streams.kstream.Window; + +public class TumblingWindow extends Window { + + public TumblingWindow(long start, long end) { + super(start, end); + } + + @Override + public boolean overlap(Window other) { + return super.overlap(other) && other.getClass().equals(TumblingWindow.class); + } + + @Override + public boolean equalsTo(Window other) { + return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java index d4ed0e7..cfcfb00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -97,20 +98,25 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> { } @Override - public WindowStoreIterator<V> fetch(K key, long timestamp) { - return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timestamp), this.rangeTime); + public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { + return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime); } @Override public void put(K key, V value) { - putAndReturnInternalKey(key, value); + putAndReturnInternalKey(key, value, -1L); } @Override - public byte[] putAndReturnInternalKey(K key, V value) { + public void put(K key, V value, long timestamp) { + putAndReturnInternalKey(key, value, timestamp); + } + + @Override + public byte[] putAndReturnInternalKey(K key, V value, long timestamp) { long startNs = time.nanoseconds(); try { - byte[] binKey = this.inner.putAndReturnInternalKey(key, value); + byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp); if (loggingEnabled) { changeLogger.add(binKey); @@ -174,7 +180,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> { } @Override - public E next() { + public KeyValue<Long, E> next() { return iter.next(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java index a32faf4..62b9f2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java @@ -222,6 +222,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public void close() { + iter.dispose(); } }
