KAFKA-3081: KTable Aggregation Author: Guozhang Wang <[email protected]>
Reviewers: Yasuhiro Matsuda Closes #761 from guozhangwang/K3081 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f22705c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f22705c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f22705c Branch: refs/heads/trunk Commit: 4f22705c7d0c8e8cab68883e76f554439341e34a Parents: 7001174 Author: Guozhang Wang <[email protected]> Authored: Wed Jan 13 17:15:57 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Jan 13 17:15:57 2016 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KTable.java | 58 +--- .../kstream/internals/ChangedDeserializer.java | 59 ++++ .../kstream/internals/ChangedSerializer.java | 57 ++++ .../kstream/internals/CountSupplier.java | 52 ++++ .../internals/DefaultWindowedDeserializer.java | 59 ---- .../internals/DefaultWindowedSerializer.java | 57 ---- .../streams/kstream/internals/KStreamImpl.java | 58 ++-- .../kstream/internals/KTableAggregate.java | 118 +++++++ .../streams/kstream/internals/KTableImpl.java | 215 +++++++------ .../kstream/internals/KTableMapValues.java | 8 +- .../kstream/internals/KTableRepartitionMap.java | 110 +++++++ .../kstream/internals/LongSumSupplier.java | 52 ++++ .../kstream/internals/WindowedDeserializer.java | 59 ++++ .../kstream/internals/WindowedSerializer.java | 57 ++++ .../kstream/internals/KTableAggregateTest.java | 122 ++++++++ .../internals/KTableMapValuesImplTest.java | 308 ------------------- .../kstream/internals/KTableMapValuesTest.java | 308 +++++++++++++++++++ .../apache/kafka/test/NoOpKeyValueMapper.java | 29 ++ 18 files changed, 1195 insertions(+), 591 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 997edcd..9837dae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -20,8 +20,6 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import java.util.Collection; - /** * KTable is an abstraction of a change log stream. * @@ -152,9 +150,11 @@ public interface KTable<K, V> { */ <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K> keySerializer, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, Serializer<V2> aggValueSerializer, - Deserializer<K> keyDeserializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, Deserializer<V2> aggValueDeserializer, String name); @@ -167,59 +167,21 @@ public interface KTable<K, V> { */ <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector, KeyValueToLongMapper<K, V> valueSelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, + Serializer<K1> keySerializer, + Deserializer<K1> keyDeserializer, String name); /** - * Sum extracted integer values of this table by the selected aggregation key - * - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value - * @param name the name of the resulted table - */ - <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector, - KeyValueToIntMapper<K, V> valueSelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - String name); - - /** - * Sum extracted double decimal values of this table by the selected aggregation key - * - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value - * @param name the name of the resulted table - */ - <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector, - KeyValueToDoubleMapper<K, V> valueSelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - String name); - - /** * Count number of records of this table by the selected aggregation key * * @param keySelector the class of KeyValueMapper to select the aggregation key * @param name the name of the resulted table */ <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, + Serializer<K1> keySerializer, + Serializer<V> valueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V> valueDeserializer, String name); - /** - * Get the top-k values of this table by the selected aggregation key - * - * @param k parameter of the top-k computation - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param name the name of the resulted table - */ - <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k, - KeyValueMapper<K, V, K1> keySelector, - Serializer<K> keySerializer, - Serializer<V1> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V1> aggValueDeserializer, - String name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java new file mode 100644 index 0000000..d4c4e2d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ChangedDeserializer<T> implements Deserializer<Change<T>> { + + private static final int NEWFLAG_SIZE = 1; + + private final Deserializer<T> inner; + + public ChangedDeserializer(Deserializer<T> inner) { + this.inner = inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // do nothing + } + + @Override + public Change<T> deserialize(String topic, byte[] data) { + + byte[] bytes = new byte[data.length - NEWFLAG_SIZE]; + + System.arraycopy(data, 0, bytes, 0, bytes.length); + + if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) { + return new Change<>(inner.deserialize(topic, bytes), null); + } else { + return new Change<>(null, inner.deserialize(topic, bytes)); + } + } + + + @Override + public void close() { + inner.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java new file mode 100644 index 0000000..e9b7cad --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ChangedSerializer<T> implements Serializer<Change<T>> { + + private static final int NEWFLAG_SIZE = 1; + + private final Serializer<T> inner; + + public ChangedSerializer(Serializer<T> inner) { + this.inner = inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // do nothing + } + + @Override + public byte[] serialize(String topic, Change<T> data) { + // only one of the old / new values would be not null + byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue); + + ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE); + buf.put(serializedKey); + buf.put((byte) (data.newValue != null ? 1 : 0)); + + return buf.array(); + } + + + @Override + public void close() { + inner.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java new file mode 100644 index 0000000..b7dc5aa --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; + +public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> { + + private class Count implements Aggregator<K, V, Long> { + + @Override + public Long initialValue() { + return 0L; + } + + @Override + public Long add(K aggKey, V value, Long aggregate) { + return aggregate + 1; + } + + @Override + public Long remove(K aggKey, V value, Long aggregate) { + return aggregate - 1; + } + + @Override + public Long merge(Long aggr1, Long aggr2) { + return aggr1 + aggr2; + } + } + + @Override + public Aggregator<K, V, Long> get() { + return new Count(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java deleted file mode 100644 index 9a14c53..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.streams.kstream.Windowed; - -import java.nio.ByteBuffer; -import java.util.Map; - -public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> { - - private static final int TIMESTAMP_SIZE = 8; - - private Deserializer<T> inner; - - public DefaultWindowedDeserializer(Deserializer<T> inner) { - this.inner = inner; - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // do nothing - } - - @Override - public Windowed<T> deserialize(String topic, byte[] data) { - - byte[] bytes = new byte[data.length - TIMESTAMP_SIZE]; - - System.arraycopy(data, 0, bytes, 0, bytes.length); - - long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE); - - // always read as unlimited window - return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start)); - } - - - @Override - public void close() { - inner.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java deleted file mode 100644 index 4bf2b28..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.Windowed; - -import java.nio.ByteBuffer; -import java.util.Map; - -public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> { - - private static final int TIMESTAMP_SIZE = 8; - - private Serializer<T> inner; - - public DefaultWindowedSerializer(Serializer<T> inner) { - this.inner = inner; - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // do nothing - } - - @Override - public byte[] serialize(String topic, Windowed<T> data) { - byte[] serializedKey = inner.serialize(topic, data.value()); - - ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE); - buf.put(serializedKey); - buf.putLong(data.window().start()); - - return buf.array(); - } - - - @Override - public void close() { - inner.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 4505e74..f53c0d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -250,16 +250,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerialzier, - Serializer<V> thisValueSerialzier, - Serializer<V1> otherValueSerialzier, - Deserializer<K> keyDeserialier, - Deserializer<V> thisValueDeserialzier, - Deserializer<V1> otherValueDeserialzier) { + Serializer<K> keySerializer, + Serializer<V> thisValueSerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> thisValueDeserializer, + Deserializer<V1> otherValueDeserializer) { return join(other, joiner, windows, - keySerialzier, thisValueSerialzier, otherValueSerialzier, - keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false); + keySerializer, thisValueSerializer, otherValueSerializer, + keyDeserializer, thisValueDeserializer, otherValueDeserializer, false); } @Override @@ -267,16 +267,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerialzier, - Serializer<V> thisValueSerialzier, - Serializer<V1> otherValueSerialzier, - Deserializer<K> keyDeserialier, - Deserializer<V> thisValueDeserialzier, - Deserializer<V1> otherValueDeserialzier) { + Serializer<K> keySerializer, + Serializer<V> thisValueSerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> thisValueDeserializer, + Deserializer<V1> otherValueDeserializer) { return join(other, joiner, windows, - keySerialzier, thisValueSerialzier, otherValueSerialzier, - keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true); + keySerializer, thisValueSerializer, otherValueSerializer, + keyDeserializer, thisValueDeserializer, otherValueDeserializer, true); } @SuppressWarnings("unchecked") @@ -284,12 +284,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerialzier, - Serializer<V> thisValueSerialzier, - Serializer<V1> otherValueSerialzier, - Deserializer<K> keyDeserialier, - Deserializer<V> thisValueDeserialzier, - Deserializer<V1> otherValueDeserialzier, + Serializer<K> keySerializer, + Serializer<V> thisValueSerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> thisValueDeserializer, + Deserializer<V1> otherValueDeserializer, boolean outer) { Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); @@ -301,7 +301,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V windows.after, windows.maintainMs(), windows.segments, - new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier), + new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer), null); RocksDBWindowStoreSupplier<K, V1> otherWindow = @@ -311,7 +311,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V windows.after, windows.maintainMs(), windows.segments, - new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), + new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer), null); KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name()); @@ -344,10 +344,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerialzier, - Serializer<V1> otherValueSerialzier, - Deserializer<K> keyDeserialier, - Deserializer<V1> otherValueDeserialzier) { + Serializer<K> keySerializer, + Serializer<V1> otherValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V1> otherValueDeserializer) { Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); @@ -358,7 +358,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V windows.after, windows.maintainMs(), windows.segments, - new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier), + new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer), null); KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name()); http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java new file mode 100644 index 0000000..a5948f8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; + +public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> { + + private final String storeName; + private final Aggregator<K, V, T> aggregator; + + private boolean sendOldValues = false; + + KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) { + this.storeName = storeName; + this.aggregator = aggregator; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + @Override + public Processor<K, Change<V>> get() { + return new KTableAggregateProcessor(); + } + + private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> { + + private KeyValueStore<K, T> store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + store = (KeyValueStore<K, T>) context.getStateStore(storeName); + } + + @Override + public void process(K key, Change<V> value) { + T oldAgg = store.get(key); + + if (oldAgg == null) + oldAgg = aggregator.initialValue(); + + T newAgg = oldAgg; + + // first try to remove the old value + if (value.oldValue != null) { + newAgg = aggregator.remove(key, value.oldValue, newAgg); + } + + // then try to add the new new value + if (value.newValue != null) { + newAgg = aggregator.add(key, value.newValue, newAgg); + } + + // update the store with the new value + store.put(key, newAgg); + + // send the old / new pair + if (sendOldValues) + context().forward(key, new Change<>(newAgg, oldAgg)); + else + context().forward(key, new Change<>(newAgg, null)); + } + } + + @Override + public KTableValueGetterSupplier<K, T> view() { + + return new KTableValueGetterSupplier<K, T>() { + + public KTableValueGetter<K, T> get() { + return new KTableAggregateValueGetter(); + } + + }; + } + + private class KTableAggregateValueGetter implements KTableValueGetter<K, T> { + + private KeyValueStore<K, T> store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + store = (KeyValueStore<K, T>) context.getStateStore(storeName); + } + + @Override + public T get(K key) { + return store.get(key); + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 32d3cc5..7f30f59 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.AggregatorSupplier; import org.apache.kafka.streams.kstream.KStream; @@ -25,16 +27,15 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper; -import org.apache.kafka.streams.kstream.KeyValueToIntMapper; import org.apache.kafka.streams.kstream.KeyValueToLongMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Stores; -import java.util.Collection; +import java.util.Collections; import java.util.Set; /** @@ -51,6 +52,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; + private static final String SELECT_NAME = "KTABLE-SELECT-"; + + private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-"; + public static final String SOURCE_NAME = "KTABLE-SOURCE-"; public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-"; @@ -169,47 +174,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @SuppressWarnings("unchecked") - KTableValueGetterSupplier<K, V> valueGetterSupplier() { - if (processorSupplier instanceof KTableSource) { - KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier; - materialize(source); - return new KTableSourceValueGetterSupplier<>(source.topic); - } else { - return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view(); - } - } - - @SuppressWarnings("unchecked") - void enableSendingOldValues() { - if (!sendOldValues) { - if (processorSupplier instanceof KTableSource) { - KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier; - materialize(source); - source.enableSendingOldValues(); - } else { - ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues(); - } - sendOldValues = true; - } - } - - boolean sendingOldValueEnabled() { - return sendOldValues; - } - - private void materialize(KTableSource<K, ?> source) { - synchronized (source) { - if (!source.isMaterialized()) { - StateStoreSupplier storeSupplier = - new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null); - // mark this state as non internal hence it is read directly from a user topic - topology.addStateStore(storeSupplier, false, name); - source.materialize(); - } - } - } - - @SuppressWarnings("unchecked") @Override public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); @@ -281,63 +245,142 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K> keySerializer, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, Serializer<V2> aggValueSerializer, - Deserializer<K> keyDeserializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, Deserializer<V2> aggValueDeserializer, String name) { - // TODO - return null; + + String selectName = topology.newName(SELECT_NAME); + String sinkName = topology.newName(KStreamImpl.SINK_NAME); + String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); + String aggregateName = topology.newName(AGGREGATE_NAME); + + String topic = name + "-repartition"; + + ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer); + ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); + + KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); + + ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get()); + + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerializer, keyDeserializer) + .withValues(aggValueSerializer, aggValueDeserializer) + .localDatabase() + .build(); + + // select the aggregate key and values (old and new), it would require parent to send old values + topology.addProcessor(selectName, selectSupplier, this.name); + this.enableSendingOldValues(); + + // send the aggregate key-value pairs to the intermediate topic for partitioning + topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName); + + // read the intermediate topic + topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); + + // aggregate the values with the aggregator and local store + topology.addProcessor(aggregateName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, aggregateName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName)); } @Override - public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector, - KeyValueToLongMapper<K, V> valueSelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, + public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector, + final KeyValueToLongMapper<K, V> valueSelector, + Serializer<K1> keySerializer, + Deserializer<K1> keyDeserializer, String name) { - // TODO - return null; - } - @Override - public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector, - KeyValueToIntMapper<K, V> valueSelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - String name) { - // TODO - return null; + Serializer<Long> longSerializer = new LongSerializer(); + Deserializer<Long> longDeserializer = new LongDeserializer(); + + KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() { + @Override + public KeyValue<K1, Long> apply(K key, V value) { + K1 aggKey = keySelector.apply(key, value); + Long aggValue = valueSelector.apply(key, value); + + return new KeyValue<>(aggKey, aggValue); + } + }; + + return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper, + keySerializer, longSerializer, longSerializer, + keyDeserializer, longDeserializer, longDeserializer, + name); } @Override - public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector, - KeyValueToDoubleMapper<K, V> valueSelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, + public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector, + Serializer<K1> keySerializer, + Serializer<V> valueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V> valueDeserializer, String name) { - // TODO - return null; + + Serializer<Long> longSerializer = new LongSerializer(); + Deserializer<Long> longDeserializer = new LongDeserializer(); + + KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() { + @Override + public KeyValue<K1, V> apply(K key, V value) { + K1 aggKey = keySelector.apply(key, value); + + return new KeyValue<>(aggKey, value); + } + }; + + return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper, + keySerializer, valueSerializer, longSerializer, + keyDeserializer, valueDeserializer, longDeserializer, + name); } - @Override - public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, - String name) { - // TODO - return null; + @SuppressWarnings("unchecked") + KTableValueGetterSupplier<K, V> valueGetterSupplier() { + if (processorSupplier instanceof KTableSource) { + KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier; + materialize(source); + return new KTableSourceValueGetterSupplier<>(source.topic); + } else { + return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view(); + } } - @Override - public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k, - KeyValueMapper<K, V, K1> keySelector, - Serializer<K> keySerializer, - Serializer<V1> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V1> aggValueDeserializer, - String name) { - // TODO - return null; + @SuppressWarnings("unchecked") + void enableSendingOldValues() { + if (!sendOldValues) { + if (processorSupplier instanceof KTableSource) { + KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier; + materialize(source); + source.enableSendingOldValues(); + } else { + ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues(); + } + sendOldValues = true; + } + } + + boolean sendingOldValueEnabled() { + return sendOldValues; + } + + private void materialize(KTableSource<K, ?> source) { + synchronized (source) { + if (!source.isMaterialized()) { + StateStoreSupplier storeSupplier = + new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null); + // mark this state as non internal hence it is read directly from a user topic + topology.addStateStore(storeSupplier, false, name); + source.materialize(); + } + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index be80855..c664906 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; + class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> { private final KTableImpl<K1, ?, V1> parent; @@ -36,7 +37,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> @Override public Processor<K1, Change<V1>> get() { - return new KTableMapProcessor(); + return new KTableMapValuesProcessor(); } @Override @@ -67,16 +68,15 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> return newValue; } - private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> { + private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> { @Override public void process(K1 key, Change<V1> change) { V2 newValue = computeValue(change.newValue); V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null; - context().forward(key, new Change(newValue, oldValue)); + context().forward(key, new Change<>(newValue, oldValue)); } - } private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> { http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java new file mode 100644 index 0000000..bbef7fb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; + +/** + * KTable repartition map functions are not exposed to public APIs, but only used for keyed aggregations. + * + * Given the input, it can output at most two records (one mapped from old value and one mapped from new value). + */ +public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> { + + private final KTableImpl<K1, ?, V1> parent; + private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper; + + public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) { + this.parent = parent; + this.mapper = mapper; + } + + @Override + public Processor<K1, Change<V1>> get() { + return new KTableMapProcessor(); + } + + @Override + public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() { + final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier(); + + return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() { + + public KTableValueGetter<K1, KeyValue<K2, V2>> get() { + return new KTableMapValueGetter(parentValueGetterSupplier.get()); + } + + }; + } + + @Override + public void enableSendingOldValues() { + // this should never be called + throw new KafkaException("KTableRepartitionMap should always require sending old values."); + } + + private KeyValue<K2, V2> computeValue(K1 key, V1 value) { + KeyValue<K2, V2> newValue = null; + + if (key != null || value != null) + newValue = mapper.apply(key, value); + + return newValue; + } + + private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> { + + @Override + public void process(K1 key, Change<V1> change) { + KeyValue<K2, V2> newPair = computeValue(key, change.newValue); + + context().forward(newPair.key, new Change<>(newPair.value, null)); + + if (change.oldValue != null) { + KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue); + context().forward(oldPair.key, new Change<>(null, oldPair.value)); + } + } + } + + private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> { + + private final KTableValueGetter<K1, V1> parentGetter; + + public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) { + this.parentGetter = parentGetter; + } + + @Override + public void init(ProcessorContext context) { + parentGetter.init(context); + } + + @Override + public KeyValue<K2, V2> get(K1 key) { + return computeValue(key, parentGetter.get(key)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java new file mode 100644 index 0000000..b66590e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; + +public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> { + + private class LongSum implements Aggregator<K, Long, Long> { + + @Override + public Long initialValue() { + return 0L; + } + + @Override + public Long add(K aggKey, Long value, Long aggregate) { + return aggregate + value; + } + + @Override + public Long remove(K aggKey, Long value, Long aggregate) { + return aggregate - value; + } + + @Override + public Long merge(Long aggr1, Long aggr2) { + return aggr1 + aggr2; + } + } + + @Override + public Aggregator<K, Long, Long> get() { + return new LongSum(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java new file mode 100644 index 0000000..96c3668 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class WindowedDeserializer<T> implements Deserializer<Windowed<T>> { + + private static final int TIMESTAMP_SIZE = 8; + + private Deserializer<T> inner; + + public WindowedDeserializer(Deserializer<T> inner) { + this.inner = inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // do nothing + } + + @Override + public Windowed<T> deserialize(String topic, byte[] data) { + + byte[] bytes = new byte[data.length - TIMESTAMP_SIZE]; + + System.arraycopy(data, 0, bytes, 0, bytes.length); + + long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE); + + // always read as unlimited window + return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start)); + } + + + @Override + public void close() { + inner.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java new file mode 100644 index 0000000..4407a5b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class WindowedSerializer<T> implements Serializer<Windowed<T>> { + + private static final int TIMESTAMP_SIZE = 8; + + private Serializer<T> inner; + + public WindowedSerializer(Serializer<T> inner) { + this.inner = inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // do nothing + } + + @Override + public byte[] serialize(String topic, Windowed<T> data) { + byte[] serializedKey = inner.serialize(topic, data.value()); + + ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE); + buf.put(serializedKey); + buf.putLong(data.window().start()); + + return buf.array(); + } + + + @Override + public void close() { + inner.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java new file mode 100644 index 0000000..189cf9d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.AggregatorSupplier; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.NoOpKeyValueMapper; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; + +public class KTableAggregateTest { + + private final Serializer<String> strSerializer = new StringSerializer(); + private final Deserializer<String> strDeserializer = new StringDeserializer(); + + private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> { + + private class StringCanonizer implements Aggregator<String, String, String> { + + @Override + public String initialValue() { + return ""; + } + + @Override + public String add(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; + } + + @Override + public String remove(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; + } + + @Override + public String merge(String aggr1, String aggr2) { + return "(" + aggr1 + ") + (" + aggr2 + ")"; + } + } + + @Override + public Aggregator<String, String, String> get() { + return new StringCanonizer(); + } + } + + @Test + public void testAggBasic() throws Exception { + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + + KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(), + new NoOpKeyValueMapper<String, String>(), + strSerializer, + strSerializer, + strSerializer, + strDeserializer, + strDeserializer, + strDeserializer, + "topic1-Canonized"); + + MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.process(topic1, "A", "1"); + driver.process(topic1, "B", "2"); + driver.process(topic1, "A", "3"); + driver.process(topic1, "B", "4"); + driver.process(topic1, "C", "5"); + driver.process(topic1, "D", "6"); + driver.process(topic1, "B", "7"); + driver.process(topic1, "C", "8"); + + assertEquals(Utils.mkList( + "A:+1", + "B:+2", + "A:+1+3", "A:+1+3-1", + "B:+2+4", "B:+2+4-2", + "C:+5", + "D:+6", + "B:+2+4-2+7", "B:+2+4-2+7-4", + "C:+5+8", "C:+5+8-5"), proc2.processed); + + } finally { + Utils.delete(baseDir); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java deleted file mode 100644 index 037b30a..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorSupplier; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class KTableMapValuesImplTest { - - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); - - @Test - public void testKTable() { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - - KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); - KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - - MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - KStreamTestDriver driver = new KStreamTestDriver(builder); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "02"); - driver.process(topic1, "C", "03"); - driver.process(topic1, "D", "04"); - - assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); - } - - @Test - public void testValueGetter() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - String topic2 = "topic2"; - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(topic2, serializer, serializer, deserializer, deserializer); - - KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); - KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); - - KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); - KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - getter2.init(driver.context()); - KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - getter3.init(driver.context()); - KTableValueGetter<String, String> getter4 = getterSupplier4.get(); - getter4.init(driver.context()); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(1), getter2.get("A")); - assertEquals(new Integer(1), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("01", getter4.get("A")); - assertEquals("01", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(2), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertEquals(new Integer(2), getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("02", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "03"); - - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(3), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("03", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", null); - - assertNull(getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertNull(getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertNull(getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - } finally { - Utils.delete(stateDir); - } - } - - @Test - public void testNotSendingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - - MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); - - builder.addProcessor("proc", proc, table2.name); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); - - assertFalse(table1.sendingOldValueEnabled()); - assertFalse(table2.sendingOldValueEnabled()); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - - proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - - proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); - - driver.process(topic1, "A", "03"); - - proc.checkAndClearResult("A:(3<-null)"); - - driver.process(topic1, "A", null); - - proc.checkAndClearResult("A:(null<-null)"); - - } finally { - Utils.delete(stateDir); - } - } - - @Test - public void testSendingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - - table2.enableSendingOldValues(); - - MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); - - builder.addProcessor("proc", proc, table2.name); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); - - assertTrue(table1.sendingOldValueEnabled()); - assertTrue(table2.sendingOldValueEnabled()); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - - proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - - proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)"); - - driver.process(topic1, "A", "03"); - - proc.checkAndClearResult("A:(3<-2)"); - - driver.process(topic1, "A", null); - - proc.checkAndClearResult("A:(null<-3)"); - - } finally { - Utils.delete(stateDir); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java new file mode 100644 index 0000000..58f1c2a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class KTableMapValuesTest { + + private final Serializer<String> strSerializer = new StringSerializer(); + private final Deserializer<String> strDeserializer = new StringDeserializer(); + + @Test + public void testKTable() { + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + + KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + + MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "02"); + driver.process(topic1, "C", "03"); + driver.process(topic1, "D", "04"); + + assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); + } + + @Test + public void testValueGetter() throws IOException { + File stateDir = Files.createTempDirectory("test").toFile(); + try { + final Serializer<String> serializer = new StringSerializer(); + final Deserializer<String> deserializer = new StringDeserializer(); + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + String topic2 = "topic2"; + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) + table1.through(topic2, serializer, serializer, deserializer, deserializer); + + KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); + KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); + + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + + KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + getter1.init(driver.context()); + KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + getter2.init(driver.context()); + KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + getter3.init(driver.context()); + KTableValueGetter<String, String> getter4 = getterSupplier4.get(); + getter4.init(driver.context()); + + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); + + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(1), getter2.get("A")); + assertEquals(new Integer(1), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("01", getter4.get("A")); + assertEquals("01", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); + + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(2), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertEquals(new Integer(2), getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("02", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", "03"); + + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(3), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("03", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", null); + + assertNull(getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertNull(getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertNull(getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + } finally { + Utils.delete(stateDir); + } + } + + @Test + public void testNotSendingOldValue() throws IOException { + File stateDir = Files.createTempDirectory("test").toFile(); + try { + final Serializer<String> serializer = new StringSerializer(); + final Deserializer<String> deserializer = new StringDeserializer(); + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + + MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); + + builder.addProcessor("proc", proc, table2.name); + + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + + assertFalse(table1.sendingOldValueEnabled()); + assertFalse(table2.sendingOldValueEnabled()); + + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); + + proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); + + proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); + + driver.process(topic1, "A", "03"); + + proc.checkAndClearResult("A:(3<-null)"); + + driver.process(topic1, "A", null); + + proc.checkAndClearResult("A:(null<-null)"); + + } finally { + Utils.delete(stateDir); + } + } + + @Test + public void testSendingOldValue() throws IOException { + File stateDir = Files.createTempDirectory("test").toFile(); + try { + final Serializer<String> serializer = new StringSerializer(); + final Deserializer<String> deserializer = new StringDeserializer(); + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + + table2.enableSendingOldValues(); + + MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); + + builder.addProcessor("proc", proc, table2.name); + + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + + assertTrue(table1.sendingOldValueEnabled()); + assertTrue(table2.sendingOldValueEnabled()); + + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); + + proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); + + proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)"); + + driver.process(topic1, "A", "03"); + + proc.checkAndClearResult("A:(3<-2)"); + + driver.process(topic1, "A", null); + + proc.checkAndClearResult("A:(null<-3)"); + + } finally { + Utils.delete(stateDir); + } + } + +}
