KAFKA-3016: phase-2. stream join implementations guozhangwang
Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #737 from ymatsuda/windowed_join2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5aad4999 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5aad4999 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5aad4999 Branch: refs/heads/trunk Commit: 5aad4999d1a1d35d61365ff57a9b79a6af3e70d2 Parents: a788c65 Author: Yasuhiro Matsuda <[email protected]> Authored: Wed Jan 6 14:34:40 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Jan 6 14:34:40 2016 -0800 ---------------------------------------------------------------------- .../kafka/streams/kstream/JoinWindowSpec.java | 91 ++++ .../apache/kafka/streams/kstream/KStream.java | 97 +++- .../streams/kstream/SlidingWindowSupplier.java | 266 ---------- .../apache/kafka/streams/kstream/Window.java | 36 -- .../kafka/streams/kstream/WindowSupplier.java | 25 - .../streams/kstream/internals/KStreamImpl.java | 147 +++++- .../streams/kstream/internals/KStreamJoin.java | 84 --- .../kstream/internals/KStreamJoinWindow.java | 58 +++ .../kstream/internals/KStreamKStreamJoin.java | 73 +++ .../kstream/internals/KStreamWindow.java | 68 --- .../kstream/internals/KStreamWindowedImpl.java | 67 --- .../state/RocksDBWindowStoreSupplier.java | 2 +- .../kstream/internals/KStreamImplTest.java | 68 +-- .../kstream/internals/KStreamJoinTest.java | 195 ------- .../internals/KStreamKStreamJoinTest.java | 505 +++++++++++++++++++ .../internals/KStreamKStreamLeftJoinTest.java | 289 +++++++++++ .../kstream/internals/KStreamWindowedTest.java | 91 ---- .../apache/kafka/test/UnlimitedWindowDef.java | 104 ---- 18 files changed, 1266 insertions(+), 1000 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java new file mode 100644 index 0000000..8f0f839 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +/** + * This class is used to specify the behaviour of windowed joins. + */ +public class JoinWindowSpec { + + public final String name; + public final long before; + public final long after; + public final long retention; + public final int segments; + + private JoinWindowSpec(String name, long before, long after, long retention, int segments) { + this.name = name; + this.after = after; + this.before = before; + this.retention = retention; + this.segments = segments; + } + + public static JoinWindowSpec of(String name) { + return new JoinWindowSpec(name, 0L, 0L, 0L, 3); + } + + /** + * Specifies that records of the same key are joinable if their timestamp stamps are within + * timeDifference. + * + * @param timeDifference + * @return + */ + public JoinWindowSpec within(long timeDifference) { + return new JoinWindowSpec(name, timeDifference, timeDifference, retention, segments); + } + + /** + * Specifies that records of the same key are joinable if their timestamp stamps are within + * timeDifference, and if the timestamp of a record from the secondary stream is + * is earlier than or equal to the timestamp of a record from the first stream. + * + * @param timeDifference + * @return + */ + public JoinWindowSpec before(long timeDifference) { + return new JoinWindowSpec(name, timeDifference, 0L, retention, segments); + } + + /** + * Specifies that records of the same key are joinable if their timestamp stamps are within + * timeDifference, and if the timestamp of a record from the secondary stream is + * is later than or equal to the timestamp of a record from the first stream. + * + * @param timeDifference + * @return + */ + public JoinWindowSpec after(long timeDifference) { + return new JoinWindowSpec(name, 0L, timeDifference, retention, segments); + } + + /** + * Specifies the retention period of windows + * @param retentionPeriod + * @return + */ + public JoinWindowSpec retentionPeriod(long retentionPeriod) { + return new JoinWindowSpec(name, before, after, retentionPeriod, segments); + } + + public JoinWindowSpec segments(int segments) { + return new JoinWindowSpec(name, before, after, retention, segments); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index d3931ef..29115c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -84,14 +84,6 @@ public interface KStream<K, V> { <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor); /** - * Creates a new windowed stream using a specified window instance. - * - * @param windowDef the instance of Window - * @return the windowed stream - */ - KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef); - - /** * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to * a corresponding stream for the first predicate is evaluated true. @@ -173,6 +165,95 @@ public interface KStream<K, V> { void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames); /** + * Combines values of this stream with another KStream using Windowed Inner Join. + * + * @param otherStream the instance of KStream joined with this stream + * @param joiner ValueJoiner + * @param joinWindowSpec the specification of the join window + * @param keySerializer key serializer, + * if not specified the default serializer defined in the configs will be used + * @param thisValueSerializer value serializer for this stream, + * if not specified the default serializer defined in the configs will be used + * @param otherValueSerializer value serializer for other stream, + * if not specified the default serializer defined in the configs will be used + * @param keyDeserializer key deserializer, + * if not specified the default serializer defined in the configs will be used + * @param thisValueDeserializer value deserializer for this stream, + * if not specified the default serializer defined in the configs will be used + * @param otherValueDeserializer value deserializer for other stream, + * if not specified the default serializer defined in the configs will be used + * @param <V1> the value type of the other stream + * @param <V2> the value type of the new stream + */ + <V1, V2> KStream<K, V2> join( + KStream<K, V1> otherStream, + ValueJoiner<V, V1, V2> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerializer, + Serializer<V> thisValueSerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> thisValueDeserializer, + Deserializer<V1> otherValueDeserializer); + + /** + * Combines values of this stream with another KStream using Windowed Outer Join. + * + * @param otherStream the instance of KStream joined with this stream + * @param joiner ValueJoiner + * @param joinWindowSpec the specification of the join window + * @param keySerializer key serializer, + * if not specified the default serializer defined in the configs will be used + * @param thisValueSerializer value serializer for this stream, + * if not specified the default serializer defined in the configs will be used + * @param otherValueSerializer value serializer for other stream, + * if not specified the default serializer defined in the configs will be used + * @param keyDeserializer key deserializer, + * if not specified the default serializer defined in the configs will be used + * @param thisValueDeserializer value deserializer for this stream, + * if not specified the default serializer defined in the configs will be used + * @param otherValueDeserializer value deserializer for other stream, + * if not specified the default serializer defined in the configs will be used + * @param <V1> the value type of the other stream + * @param <V2> the value type of the new stream + */ + <V1, V2> KStream<K, V2> outerJoin( + KStream<K, V1> otherStream, + ValueJoiner<V, V1, V2> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerializer, + Serializer<V> thisValueSerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> thisValueDeserializer, + Deserializer<V1> otherValueDeserializer); + + /** + * Combines values of this stream with another KStream using Windowed Left Join. + * + * @param otherStream the instance of KStream joined with this stream + * @param joiner ValueJoiner + * @param keySerializer key serializer, + * if not specified the default serializer defined in the configs will be used + * @param otherValueSerializer value serializer for other stream, + * if not specified the default serializer defined in the configs will be used + * @param keyDeserializer key deserializer, + * if not specified the default serializer defined in the configs will be used + * @param otherValueDeserializer value deserializer for other stream, + * if not specified the default serializer defined in the configs will be used + * @param <V1> the value type of the other stream + * @param <V2> the value type of the new stream + */ + <V1, V2> KStream<K, V2> leftJoin( + KStream<K, V1> otherStream, + ValueJoiner<V, V1, V2> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V1> otherValueDeserializer); + + /** * Combines values of this stream with KTable using Left Join. * * @param ktable the instance of KTable joined with this stream http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java deleted file mode 100644 index 80e548f..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.internals.FilteredIterator; -import org.apache.kafka.streams.kstream.internals.WindowSupport; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.internals.Stamped; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; - -public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> { - private final String name; - private final long duration; - private final int maxCount; - private final Serializer<K> keySerializer; - private final Serializer<V> valueSerializer; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; - - public SlidingWindowSupplier( - String name, - long duration, - int maxCount, - Serializer<K> keySerializer, - Serializer<V> valueSerializer, - Deserializer<K> keyDeseriaizer, - Deserializer<V> valueDeserializer) { - this.name = name; - this.duration = duration; - this.maxCount = maxCount; - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; - this.keyDeserializer = keyDeseriaizer; - this.valueDeserializer = valueDeserializer; - } - - @Override - public String name() { - return name; - } - - @Override - public Window<K, V> get() { - return new SlidingWindow(); - } - - public class SlidingWindow extends WindowSupport implements Window<K, V> { - private final Object lock = new Object(); - private ProcessorContext context; - private int partition; - private int slotNum; // used as a key for Kafka log compaction - private LinkedList<K> list = new LinkedList<K>(); - private HashMap<K, ValueList<V>> map = new HashMap<>(); - - @Override - public void init(ProcessorContext context) { - this.context = context; - this.partition = context.id().partition; - SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback(); - context.register(this, true, restoreFunc); - - for (ValueList<V> valueList : map.values()) { - valueList.clearDirtyValues(); - } - this.slotNum = restoreFunc.slotNum; - } - - @Override - public Iterator<V> findAfter(K key, final long timestamp) { - return find(key, timestamp, timestamp + duration); - } - - @Override - public Iterator<V> findBefore(K key, final long timestamp) { - return find(key, timestamp - duration, timestamp); - } - - @Override - public Iterator<V> find(K key, final long timestamp) { - return find(key, timestamp - duration, timestamp + duration); - } - - /* - * finds items in the window between startTime and endTime (both inclusive) - */ - private Iterator<V> find(K key, final long startTime, final long endTime) { - final ValueList<V> values = map.get(key); - - if (values == null) { - return Collections.emptyIterator(); - } else { - return new FilteredIterator<V, Value<V>>(values.iterator()) { - @Override - protected V filter(Value<V> item) { - if (startTime <= item.timestamp && item.timestamp <= endTime) - return item.value; - else - return null; - } - }; - } - } - - @Override - public void put(K key, V value, long timestamp) { - synchronized (lock) { - slotNum++; - - list.offerLast(key); - - ValueList<V> values = map.get(key); - if (values == null) { - values = new ValueList<>(); - map.put(key, values); - } - - values.add(slotNum, value, timestamp); - } - evictExcess(); - evictExpired(timestamp - duration); - } - - private void evictExcess() { - while (list.size() > maxCount) { - K oldestKey = list.pollFirst(); - - ValueList<V> values = map.get(oldestKey); - values.removeFirst(); - - if (values.isEmpty()) map.remove(oldestKey); - } - } - - private void evictExpired(long cutoffTime) { - while (true) { - K oldestKey = list.peekFirst(); - - ValueList<V> values = map.get(oldestKey); - Stamped<V> oldestValue = values.first(); - - if (oldestValue.timestamp < cutoffTime) { - list.pollFirst(); - values.removeFirst(); - - if (values.isEmpty()) map.remove(oldestKey); - } else { - break; - } - } - } - - @Override - public String name() { - return name; - } - - @Override - public void flush() { - IntegerSerializer intSerializer = new IntegerSerializer(); - ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); - - RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); - - for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) { - ValueList<V> values = entry.getValue(); - if (values.hasDirtyValues()) { - K key = entry.getKey(); - - byte[] keyBytes = keySerializer.serialize(name, key); - - Iterator<Value<V>> iterator = values.dirtyValueIterator(); - while (iterator.hasNext()) { - Value<V> dirtyValue = iterator.next(); - byte[] slot = intSerializer.serialize("", dirtyValue.slotNum); - byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value); - - byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length]; - - int offset = 0; - offset += putLong(combined, offset, dirtyValue.timestamp); - offset += puts(combined, offset, keyBytes); - offset += puts(combined, offset, valBytes); - - if (offset != combined.length) - throw new IllegalStateException("serialized length does not match"); - - collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer); - } - values.clearDirtyValues(); - } - } - } - - @Override - public void close() { - // TODO - } - - @Override - public boolean persistent() { - // TODO: should not be persistent, right? - return false; - } - - private class SlidingWindowRegistryCallback implements StateRestoreCallback { - - final IntegerDeserializer intDeserializer; - int slotNum = 0; - - SlidingWindowRegistryCallback() { - intDeserializer = new IntegerDeserializer(); - } - - @Override - public void restore(byte[] slot, byte[] bytes) { - - slotNum = intDeserializer.deserialize("", slot); - - int offset = 0; - // timestamp - long timestamp = getLong(bytes, offset); - offset += 8; - // key - int length = getInt(bytes, offset); - offset += 4; - K key = deserialize(bytes, offset, length, name, keyDeserializer); - offset += length; - // value - length = getInt(bytes, offset); - offset += 4; - V value = deserialize(bytes, offset, length, name, valueDeserializer); - - put(key, value, timestamp); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java deleted file mode 100644 index a1456f6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; - -import java.util.Iterator; - -public interface Window<K, V> extends StateStore { - - void init(ProcessorContext context); - - Iterator<V> find(K key, long timestamp); - - Iterator<V> findAfter(K key, long timestamp); - - Iterator<V> findBefore(K key, long timestamp); - - void put(K key, V value, long timestamp); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java deleted file mode 100644 index 46a2b9e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface WindowSupplier<K, V> { - - String name(); - - Window<K, V> get(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 67a2d27..f47fe0f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.JoinWindowSpec; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; @@ -26,12 +27,12 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamWindowed; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier; +import org.apache.kafka.streams.state.Serdes; import java.lang.reflect.Array; import java.util.HashSet; @@ -67,6 +68,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-"; + public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-"; + + public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-"; + public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-"; public static final String MERGE_NAME = "KSTREAM-MERGE-"; @@ -132,15 +137,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) { - String name = topology.newName(WINDOWED_NAME); - - topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name); - - return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier); - } - - @Override @SuppressWarnings("unchecked") public KStream<K, V>[] branch(Predicate<K, V>... predicates) { String branchName = topology.newName(BRANCH_NAME); @@ -239,6 +235,135 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V topology.connectProcessorAndStateStores(name, stateStoreNames); } + @Override + public <V1, R> KStream<K, R> join( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerialzier, + Serializer<V> thisValueSerialzier, + Serializer<V1> otherValueSerialzier, + Deserializer<K> keyDeserialier, + Deserializer<V> thisValueDeserialzier, + Deserializer<V1> otherValueDeserialzier) { + + return join(other, joiner, joinWindowSpec, + keySerialzier, thisValueSerialzier, otherValueSerialzier, + keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false); + } + + @Override + public <V1, R> KStream<K, R> outerJoin( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerialzier, + Serializer<V> thisValueSerialzier, + Serializer<V1> otherValueSerialzier, + Deserializer<K> keyDeserialier, + Deserializer<V> thisValueDeserialzier, + Deserializer<V1> otherValueDeserialzier) { + + return join(other, joiner, joinWindowSpec, + keySerialzier, thisValueSerialzier, otherValueSerialzier, + keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true); + } + + @SuppressWarnings("unchecked") + private <V1, R> KStream<K, R> join( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerialzier, + Serializer<V> thisValueSerialzier, + Serializer<V1> otherValueSerialzier, + Deserializer<K> keyDeserialier, + Deserializer<V> thisValueDeserialzier, + Deserializer<V1> otherValueDeserialzier, + boolean outer) { + + Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); + + RocksDBWindowStoreSupplier<K, V> thisWindow = + new RocksDBWindowStoreSupplier<>( + joinWindowSpec.name + "-1", + joinWindowSpec.before, + joinWindowSpec.after, + joinWindowSpec.retention, + joinWindowSpec.segments, + new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier), + null); + + RocksDBWindowStoreSupplier<K, V1> otherWindow = + new RocksDBWindowStoreSupplier<>( + joinWindowSpec.name + "-2", + joinWindowSpec.after, + joinWindowSpec.before, + joinWindowSpec.retention, + joinWindowSpec.segments, + new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), + null); + + KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name()); + KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name()); + + KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer); + KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer); + KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>(); + + String thisWindowStreamName = topology.newName(WINDOWED_NAME); + String otherWindowStreamName = topology.newName(WINDOWED_NAME); + String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME); + String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME); + String joinMergeName = topology.newName(MERGE_NAME); + + topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name); + topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name); + topology.addProcessor(joinThisName, joinThis, thisWindowStreamName); + topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName); + topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); + topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName); + topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName); + + return new KStreamImpl<>(topology, joinMergeName, allSourceNodes); + } + + @SuppressWarnings("unchecked") + @Override + public <V1, R> KStream<K, R> leftJoin( + KStream<K, V1> other, + ValueJoiner<V, V1, R> joiner, + JoinWindowSpec joinWindowSpec, + Serializer<K> keySerialzier, + Serializer<V1> otherValueSerialzier, + Deserializer<K> keyDeserialier, + Deserializer<V1> otherValueDeserialzier) { + + Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); + + RocksDBWindowStoreSupplier<K, V1> otherWindow = + new RocksDBWindowStoreSupplier<>( + joinWindowSpec.name, + joinWindowSpec.after, + joinWindowSpec.before, + joinWindowSpec.retention, + joinWindowSpec.segments, + new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), + null); + + KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name()); + KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true); + + String otherWindowStreamName = topology.newName(WINDOWED_NAME); + String joinThisName = topology.newName(LEFTJOIN_NAME); + + topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name); + topology.addProcessor(joinThisName, joinThis, this.name); + topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName); + + return new KStreamImpl<>(topology, joinThisName, allSourceNodes); + } + @SuppressWarnings("unchecked") @Override public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java deleted file mode 100644 index eefb8c9..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -import java.util.Iterator; - -class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> { - - private static abstract class Finder<K, T> { - abstract Iterator<T> find(K key, long timestamp); - } - - private final String windowName; - private final ValueJoiner<V1, V2, V> joiner; - - KStreamJoin(String windowName, ValueJoiner<V1, V2, V> joiner) { - this.windowName = windowName; - this.joiner = joiner; - } - - @Override - public Processor<K, V1> get() { - return new KStreamJoinProcessor(windowName); - } - - private class KStreamJoinProcessor extends AbstractProcessor<K, V1> { - - private final String windowName; - protected Finder<K, V2> finder; - - public KStreamJoinProcessor(String windowName) { - this.windowName = windowName; - } - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - super.init(context); - - final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName); - - this.finder = new Finder<K, V2>() { - @Override - Iterator<V2> find(K key, long timestamp) { - return window.find(key, timestamp); - } - }; - } - - @Override - public void process(K key, V1 value) { - long timestamp = context().timestamp(); - Iterator<V2> iter = finder.find(key, timestamp); - if (iter != null) { - while (iter.hasNext()) { - context().forward(key, joiner.apply(value, iter.next())); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java new file mode 100644 index 0000000..b122aa1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.state.WindowStore; + +class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { + + private final String windowName; + + KStreamJoinWindow(String windowName) { + this.windowName = windowName; + } + + @Override + public Processor<K, V> get() { + return new KStreamJoinWindowProcessor(); + } + + private class KStreamJoinWindowProcessor extends AbstractProcessor<K, V> { + + private WindowStore<K, V> window; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + window = (WindowStore<K, V>) context.getStateStore(windowName); + } + + @Override + public void process(K key, V value) { + context().forward(key, value); + window.put(key, value); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java new file mode 100644 index 0000000..8a9bf6c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.state.WindowStore; + +import java.util.Iterator; + +class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { + + private final String otherWindowName; + private final ValueJoiner<V1, V2, R> joiner; + private final boolean outer; + + KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) { + this.otherWindowName = otherWindowName; + this.joiner = joiner; + this.outer = outer; + } + + @Override + public Processor<K, V1> get() { + return new KStreamKStreamJoinProcessor(); + } + + private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> { + + private WindowStore<K, V2> otherWindow; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName); + } + + @Override + public void process(K key, V1 value) { + boolean needOuterJoin = KStreamKStreamJoin.this.outer; + + Iterator<V2> iter = otherWindow.fetch(key, context().timestamp()); + while (iter.hasNext()) { + needOuterJoin = false; + context().forward(key, joiner.apply(value, iter.next())); + } + + if (needOuterJoin) + context().forward(key, joiner.apply(value, null)); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java deleted file mode 100644 index 2923936..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.WindowSupplier; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -public class KStreamWindow<K, V> implements ProcessorSupplier<K, V> { - - private final WindowSupplier<K, V> windowSupplier; - - KStreamWindow(WindowSupplier<K, V> windowSupplier) { - this.windowSupplier = windowSupplier; - } - - public WindowSupplier<K, V> window() { - return windowSupplier; - } - - @Override - public Processor<K, V> get() { - return new KStreamWindowProcessor(); - } - - private class KStreamWindowProcessor extends AbstractProcessor<K, V> { - - private Window<K, V> window; - - @Override - public void init(ProcessorContext context) { - super.init(context); - this.window = windowSupplier.get(); - this.window.init(context); - } - - @Override - public void process(K key, V value) { - synchronized (this) { - window.put(key, value, context().timestamp()); - context().forward(key, value); - } - } - - @Override - public void close() { - window.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java deleted file mode 100644 index c71c11b..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStreamWindowed; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.WindowSupplier; - -import java.util.HashSet; -import java.util.Set; - -public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> { - - private final WindowSupplier<K, V> windowSupplier; - - public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) { - super(topology, name, sourceNodes); - this.windowSupplier = windowSupplier; - } - - @Override - public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) { - String thisWindowName = this.windowSupplier.name(); - String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowSupplier.name(); - Set<String> thisSourceNodes = this.sourceNodes; - Set<String> otherSourceNodes = ((KStreamWindowedImpl<K, V1>) other).sourceNodes; - - if (thisSourceNodes == null || otherSourceNodes == null) - throw new KafkaException("not joinable"); - - Set<String> allSourceNodes = new HashSet<>(sourceNodes); - allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes); - - KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner); - KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, reverseJoiner(valueJoiner)); - KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>(); - - String joinThisName = topology.newName(JOINTHIS_NAME); - String joinOtherName = topology.newName(JOINOTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); - - topology.addProcessor(joinThisName, joinThis, this.name); - topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name); - topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.copartitionSources(allSourceNodes); - - return new KStreamImpl<>(topology, joinMergeName, allSourceNodes); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java index 41c725d..73814ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java @@ -39,7 +39,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { private final Serdes serdes; private final Time time; - protected RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) { + public RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) { this.name = name; this.windowBefore = windowBefore; this.windowAfter = windowAfter; http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 1e775b8..108bf3c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -22,10 +22,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; import java.util.Collections; @@ -72,54 +70,38 @@ public class KStreamImplTest { }); KStream<String, Integer>[] streams2 = stream2.branch( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }, + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return true; + } } - }, - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return true; - } - } ); KStream<String, Integer>[] streams3 = stream3.branch( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }, - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return true; + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }, + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return true; + } } - } ); - KStream<String, Integer> stream4 = streams2[0].with(new UnlimitedWindowDef<String, Integer>("window")) - .join(streams3[0].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() { - @Override - public Integer apply(Integer value1, Integer value2) { - return value1 + value2; - } - }); - - KStream<String, Integer> stream5 = streams2[1].with(new UnlimitedWindowDef<String, Integer>("window")) - .join(streams3[1].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() { - @Override - public Integer apply(Integer value1, Integer value2) { - return value1 + value2; - } - }); - - stream4.to("topic-5"); + streams2[0].to("topic-5"); - stream5.through("topic-6").process(new MockProcessorSupplier<String, Integer>()); + streams2[1].through("topic-6").process(new MockProcessorSupplier<String, Integer>()); assertEquals(2 + // sources 2 + // stream1 @@ -127,8 +109,6 @@ public class KStreamImplTest { 1 + // stream3 1 + 2 + // streams2 1 + 2 + // streams3 - 2 + 3 + // stream4 - 2 + 3 + // stream5 1 + // to 2 + // through 1, // process http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java deleted file mode 100644 index 12bed17..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStreamWindowed; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.UnlimitedWindowDef; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - -import static org.junit.Assert.assertEquals; - -public class KStreamJoinTest { - - private String topic1 = "topic1"; - private String topic2 = "topic2"; - private String dummyTopic = "dummyTopic"; - - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); - - private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { - @Override - public String apply(String value1, String value2) { - return value1 + "+" + value2; - } - }; - - private ValueMapper<String, String> valueMapper = new ValueMapper<String, String>() { - @Override - public String apply(String value) { - return "#" + value; - } - }; - - private ValueMapper<String, Iterable<String>> valueMapper2 = new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(String value) { - return (Iterable<String>) Utils.mkSet(value); - } - }; - - private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper = - new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { - @Override - public KeyValue<Integer, String> apply(Integer key, String value) { - return KeyValue.pair(key, value); - } - }; - - KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>> keyValueMapper2 = - new KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>>() { - @Override - public KeyValue<Integer, Iterable<String>> apply(Integer key, String value) { - return KeyValue.pair(key, (Iterable<String>) Utils.mkSet(value)); - } - }; - - - @Test - public void testJoin() { - KStreamBuilder builder = new KStreamBuilder(); - - final int[] expectedKeys = new int[]{0, 1, 2, 3}; - - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> dummyStream; - KStreamWindowed<Integer, String> windowed1; - KStreamWindowed<Integer, String> windowed2; - MockProcessorSupplier<Integer, String> processor; - String[] expected; - - processor = new MockProcessorSupplier<>(); - stream1 = builder.from(keyDeserializer, valDeserializer, topic1); - stream2 = builder.from(keyDeserializer, valDeserializer, topic2); - dummyStream = builder.from(keyDeserializer, valDeserializer, dummyTopic); - windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1")); - windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2")); - - windowed1.join(windowed2, joiner).process(processor); - - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - - KStreamTestDriver driver = new KStreamTestDriver(builder); - driver.setTime(0L); - - // push two items to the main stream. the other stream's window is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - - assertEquals(0, processor.processed.size()); - - // push two items to the other stream. the main stream's window has two items - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - - assertEquals(2, processor.processed.size()); - - expected = new String[]{"0:X0+Y0", "1:X1+Y1"}; - - for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); - } - - processor.processed.clear(); - - // push all items to the main stream. this should produce two items. - - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - - assertEquals(2, processor.processed.size()); - - expected = new String[]{"0:X0+Y0", "1:X1+Y1"}; - - for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); - } - - processor.processed.clear(); - - // there will be previous two items + all items in the main stream's window, thus two are duplicates. - - // push all items to the other stream. this should produce 6 items - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - - assertEquals(6, processor.processed.size()); - - expected = new String[]{"0:X0+Y0", "0:X0+Y0", "1:X1+Y1", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3"}; - - for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); - } - } - - @Test(expected = KafkaException.class) - public void testNotJoinable() { - KStreamBuilder builder = new KStreamBuilder(); - - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStreamWindowed<Integer, String> windowed1; - KStreamWindowed<Integer, String> windowed2; - MockProcessorSupplier<Integer, String> processor; - - processor = new MockProcessorSupplier<>(); - stream1 = builder.from(keyDeserializer, valDeserializer, topic1).map(keyValueMapper); - stream2 = builder.from(keyDeserializer, valDeserializer, topic2); - windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1")); - windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2")); - - windowed1.join(windowed2, joiner).process(processor); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java new file mode 100644 index 0000000..5a937af --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.JoinWindowSpec; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class KStreamKStreamJoinTest { + + private String topic1 = "topic1"; + private String topic2 = "topic2"; + + private IntegerSerializer keySerializer = new IntegerSerializer(); + private StringSerializer valSerializer = new StringSerializer(); + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { + @Override + public String apply(String value1, String value2) { + return value1 + "+" + value2; + } + }; + + @Test + public void testJoin() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100), + keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); + joined.process(processor); + + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver.setTime(0L); + + // push two items to the primary stream. the other window is empty + // w1 = {} + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = {} + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // push two items to the other stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + + // push all four items to the primary stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + + // push all items to the other stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + // push all four items to the primary stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + + // push two items to the other stream. this should produce six item. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testOuterJoin() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + joined = stream1.outerJoin(stream2, joiner, JoinWindowSpec.of("test").within(100), + keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); + joined.process(processor); + + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver.setTime(0L); + + // push two items to the primary stream. the other window is empty.this should produce two items + // w1 = {} + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = {} + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+null", "1:X1+null"); + + // push two items to the other stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + + // push all four items to the primary stream. this should produce four items. + // w1 = { 0:X0, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + + // push all items to the other stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + // push all four items to the primary stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + + // push two items to the other stream. this should produce six item. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testWindowing() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + + long time = 0L; + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + joined = stream1.join(stream2, joiner, JoinWindowSpec.of("test").within(100), + keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); + joined.process(processor); + + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver.setTime(time); + + // push two items to the primary stream. the other window is empty. this should produce no items. + // w1 = {} + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = {} + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // push two items to the other stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + + // clear logically + time = 1000L; + + for (int i = 0; i < expectedKeys.length; i++) { + driver.setTime(time + i); + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + processor.checkAndClearResult(); + + // gradually expires items in w1 + // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 } + + time = 1000 + 100L; + driver.setTime(time); + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("3:X3+YY3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // go back to the time before expiration + + time = 1000L - 100L - 1L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + // clear (logically) + time = 2000L; + + for (int i = 0; i < expectedKeys.length; i++) { + driver.setTime(time + i); + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // gradually expires items in w2 + // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + + time = 2000L + 100L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // go back to the time before expiration + + time = 2000L - 100L - 1L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + + } finally { + Utils.delete(baseDir); + } + } + +}
