This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 240d758 MINOR: improve JavaDocs about global state stores (#6359) 240d758 is described below commit 240d7589d624b72fa95e2f84a84778bc3a127927 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Wed Mar 6 18:19:47 2019 -0800 MINOR: improve JavaDocs about global state stores (#6359) Improve JavaDocs about global state stores. Reviewers: Guozhang Wang <wangg...@gmail.com>, Sophie Blee-Goldman <sop...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/StreamsBuilder.java | 9 ++ .../org/apache/kafka/streams/kstream/KStream.java | 142 +++++++++++---------- .../kafka/streams/scala/StreamsBuilder.scala | 10 +- .../kafka/streams/scala/kstream/KStream.scala | 25 ++-- 4 files changed, 110 insertions(+), 76 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 1b3b4a2..9e89d7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -25,9 +25,12 @@ import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -447,6 +450,9 @@ public class StreamsBuilder { /** * Adds a state store to the underlying {@link Topology}. + * <p> + * It is required to connect state stores to {@link Processor Processors}, {@link Transformer Transformers}, + * or {@link ValueTransformer ValueTransformers} before they can be used. * * @param builder the builder used to obtain this state store {@link StateStore} instance * @return itself @@ -492,6 +498,9 @@ public class StreamsBuilder { * records forwarded from the {@link SourceNode}. * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * <p> + * It is not required to connect a global store to {@link Processor Processors}, {@link Transformer Transformers}, + * or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default. * * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} * @param topic the topic to source the data from diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 5138917..df001d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -229,18 +229,19 @@ public interface KStream<K, V> { * and emit a record {@code <word:1>} for each word. * <pre>{@code * KStream<byte[], String> inputStream = builder.stream("topic"); - * KStream<String, Integer> outputStream = inputStream.flatMap(new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> { - * Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) { - * String[] tokens = value.split(" "); - * List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length); + * KStream<String, Integer> outputStream = inputStream.flatMap( + * new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> { + * Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) { + * String[] tokens = value.split(" "); + * List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length); + * + * for(String token : tokens) { + * result.add(new KeyValue<>(token, 1)); + * } * - * for(String token : tokens) { - * result.add(new KeyValue<>(token, 1)); + * return result; * } - * - * return result; - * } - * }); + * }); * }</pre> * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) * and the return value must not be {@code null}. @@ -497,7 +498,8 @@ public interface KStream<K, V> { * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress * can be observed and additional periodic actions can be performed. * <p> - * In order to assign a state, the state must be created and registered beforehand: + * In order to assign a state, the state must be created and registered beforehand (it's not required to connect + * global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -580,7 +582,8 @@ public interface KStream<K, V> { * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress * can be observed and additional periodic actions can be performed. * <p> - * In order to assign a state, the state must be created and registered beforehand: + * In order to assign a state, the state must be created and registered beforehand (it's not required to connect + * global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -652,10 +655,11 @@ public interface KStream<K, V> { * record value and computes a new value for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}). - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional - * periodic actions can be performed. + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress + * can be observed and additional periodic actions can be performed. * <p> - * In order to assign a state, the state must be created and registered beforehand: + * In order to assign a state, the state must be created and registered beforehand (it's not required to connect + * global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -669,8 +673,8 @@ public interface KStream<K, V> { * }</pre> * Within the {@link ValueTransformer}, the state is obtained via the * {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, a schedule must be - * registered. + * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, + * a schedule must be registered. * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue} * pairs should be emitted via {@link ProcessorContext#forward(Object, Object) * ProcessorContext.forward()}. @@ -682,7 +686,8 @@ public interface KStream<K, V> { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); * } * * NewValueType transform(V value) { @@ -718,14 +723,15 @@ public interface KStream<K, V> { /** * Transform the value of each input record into a new value (with possible new type) of the output record. - * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input - * record value and computes a new value for it. + * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to + * each input record value and computes a new value for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}). - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional - * periodic actions can be performed. + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress + * can be observed and additional periodic actions can be performed. * <p> - * In order to assign a state, the state must be created and registered beforehand: + * In order to assign a state, the state must be created and registered beforehand (it's not required to connect + * global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -752,7 +758,8 @@ public interface KStream<K, V> { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); * } * * NewValueType transform(K readOnlyKey, V value) { @@ -791,11 +798,12 @@ public interface KStream<K, V> { * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given * {@link ProcessorSupplier}). * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}). - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional - * periodic actions can be performed. + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress + * can be observed and additional periodic actions can be performed. * Note that this is a terminal operation that returns void. * <p> - * In order to assign a state, the state must be created and registered beforehand: + * In order to assign a state, the state must be created and registered beforehand (it's not required to connect + * global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -819,7 +827,8 @@ public interface KStream<K, V> { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myProcessorState"); - * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); * } * * void process(K key, V value) { @@ -857,8 +866,8 @@ public interface KStream<K, V> { * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later * operator depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is - * an internally generated name, and "-repartition" is a fixed suffix. + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -886,8 +895,8 @@ public interface KStream<K, V> { * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka * if a later operator depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is - * an internally generated name, and "-repartition" is a fixed suffix. + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -916,9 +925,9 @@ public interface KStream<K, V> { * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later operator * depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, <name> is - * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name, - * and "-repartition" is a fixed suffix. + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * <name> is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally + * generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -938,14 +947,15 @@ public interface KStream<K, V> { * and default serializers and deserializers. * Grouping a stream on the record key is required before an aggregation operator can be applied to the data * (cf. {@link KGroupedStream}). - * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values. + * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the + * original values. * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream} * <p> * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a * later operator depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is - * an internally generated name, and "-repartition" is a fixed suffix. + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -966,14 +976,15 @@ public interface KStream<K, V> { * and {@link Serde}s as specified by {@link Serialized}. * Grouping a stream on the record key is required before an aggregation operator can be applied to the data * (cf. {@link KGroupedStream}). - * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values. + * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the + * original values. * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * <p> * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a * later operator depends on the newly selected key. * This topic will be as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is - * an internally generated name, and "-repartition" is a fixed suffix. + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -997,14 +1008,16 @@ public interface KStream<K, V> { * and {@link Serde}s as specified by {@link Grouped}. * Grouping a stream on the record key is required before an aggregation operator can be applied to the data * (cf. {@link KGroupedStream}). - * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values. + * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the + * original values. * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * <p> * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later * operator depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is - * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name. + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an + * internally generated name. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -1067,8 +1080,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1144,8 +1157,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1227,8 +1240,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1308,8 +1321,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1392,8 +1405,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1474,8 +1487,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1561,8 +1574,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -1636,8 +1649,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -1717,8 +1730,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -1795,8 +1808,8 @@ public interface KStream<K, V> { * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and - * "-repartition" is a fixed suffix. + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated + * name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> @@ -1889,5 +1902,4 @@ public interface KStream<K, V> { <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner); - } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala index 8c5a9b3..4a1df92 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala @@ -27,6 +27,7 @@ import org.apache.kafka.streams.state.StoreBuilder import org.apache.kafka.streams.{Topology, StreamsBuilder => StreamsBuilderJ} import org.apache.kafka.streams.scala.kstream._ import ImplicitConversions._ +import org.apache.kafka.streams.errors.TopologyException import scala.collection.JavaConverters._ @@ -77,7 +78,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { /** * Create a [[kstream.KStream]] from the specified topic pattern. * - * @param topics the topic name pattern + * @param topicPattern the topic name pattern * @return a [[kstream.KStream]] for the specified topics * @see #stream(String) * @see `org.apache.kafka.streams.StreamsBuilder#stream` @@ -157,6 +158,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { /** * Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`, * `Transformer`, or `ValueTransformer` before it can be used. + * <p> + * It is required to connect state stores to `Processor`, `Transformer`, or `ValueTransformer` before they can be used. * * @param builder the builder used to obtain this state store `StateStore` instance * @return the underlying Java abstraction `StreamsBuilder` after adding the `StateStore` @@ -166,8 +169,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder) /** - * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`, + * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor`, `Transformer`, * or `ValueTransformer` (in contrast to regular stores). + * <p> + * It is not required to connect a global store to `Processor`, `Transformer`, or `ValueTransformer`; + * those have read-only access to all global stores by default. * * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore` */ diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 635975b..5df9de8 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -285,9 +285,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * can be altered arbitrarily). * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record * and computes zero or more output records. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` - * before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `Transformer`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. * * @param transformerSupplier the `TransformerSuplier` that generates `Transformer` * @param stateStoreNames the names of the state stores used by the processor @@ -302,8 +303,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Transform the value of each input record into a new value (with possible new type) of the output record. * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input * record value and computes a new value for it. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `ValueTransformer`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. * * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` * @param stateStoreNames the names of the state stores used by the processor @@ -318,8 +321,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Transform the value of each input record into a new value (with possible new type) of the output record. * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input * record value and computes a new value for it. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `ValueTransformer`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. * * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` * @param stateStoreNames the names of the state stores used by the processor @@ -333,8 +338,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given * `processorSupplier`). - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `Processor`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. * * @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]] * @param stateStoreNames the names of the state store used by the processor