Repository: kafka Updated Branches: refs/heads/trunk 29ea4b0f1 -> 18223415b
KAFKA-4302: Simplify KTableSource KTableSource is always materialized since IQ: - removed flag KTableSource#materialized - removed MaterializedKTableSourceProcessor Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Eno Thereska, Guozhang Wang Closes #2065 from mjsax/kafka-4302-simplify-ktablesource Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/18223415 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/18223415 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/18223415 Branch: refs/heads/trunk Commit: 18223415b091b0c05af4767191bb97b366aa8fa5 Parents: 29ea4b0 Author: Matthias J. Sax <matth...@confluent.io> Authored: Sun Oct 30 11:34:17 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Sun Oct 30 11:34:17 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/KStreamBuilder.java | 13 ++++++++-- .../streams/kstream/internals/KTableImpl.java | 25 -------------------- .../streams/kstream/internals/KTableSource.java | 22 +---------------- 3 files changed, 12 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/18223415/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index f9544cc..38d126e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; @@ -139,7 +141,7 @@ public class KStreamBuilder extends TopologyBuilder { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be null - * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected + * @param storeName the state store name used for the materialized KTable * @return a {@link KTable} for the specified topics */ public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) { @@ -151,7 +153,14 @@ public class KStreamBuilder extends TopologyBuilder { addProcessor(name, processorSupplier, source); final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName); - kTable.materialize((KTableSource) processorSupplier); + StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, + keySerde, + valSerde, + false, + Collections.<String, String>emptyMap(), + true); + + addStateStore(storeSupplier, name); connectSourceStoreAndTopic(storeName, topic); return kTable; http://git-wip-us.apache.org/repos/asf/kafka/blob/18223415/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 683dc00..cd83d50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KGroupedTable; @@ -31,14 +30,11 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; -import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -371,9 +367,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, KTableValueGetterSupplier<K, V> valueGetterSupplier() { if (processorSupplier instanceof KTableSource) { KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier; - if (!source.isMaterialized()) { - throw new StreamsException("Source is not materialized"); - } return new KTableSourceValueGetterSupplier<>(source.storeName); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); @@ -387,9 +380,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier; - if (!source.isMaterialized()) { - throw new StreamsException("Source is not materialized"); - } source.enableSendingOldValues(); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues(); @@ -404,19 +394,4 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return sendOldValues; } - public void materialize(KTableSource<K, ?> source) { - synchronized (source) { - if (!source.isMaterialized()) { - StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(source.storeName, - keySerde, - valSerde, - false, - Collections.<String, String>emptyMap(), - true); - // mark this state as non internal hence it is read directly from a user topic - topology.addStateStore(storeSupplier, name); - source.materialize(); - } - } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/18223415/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index d8d389f..20a80f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -29,7 +29,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { public final String storeName; - private boolean materialized = false; private boolean sendOldValues = false; public KTableSource(String storeName) { @@ -38,15 +37,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { @Override public Processor<K, V> get() { - return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor(); - } - - public void materialize() { - materialized = true; - } - - public boolean isMaterialized() { - return materialized; + return new KTableSourceProcessor(); } public void enableSendingOldValues() { @@ -54,17 +45,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { } private class KTableSourceProcessor extends AbstractProcessor<K, V> { - @Override - public void process(K key, V value) { - // the keys should never be null - if (key == null) - throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null."); - - context().forward(key, new Change<>(value, null)); - } - } - - private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> { private KeyValueStore<K, V> store;