This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 05668e9 KAFKA-7875: Add KStream.flatTransformValues (#6424) 05668e9 is described below commit 05668e98f531cf4d6ddb0696f0f72675ca128581 Author: cadonna <br...@confluent.io> AuthorDate: Tue Apr 16 09:10:38 2019 -0700 KAFKA-7875: Add KStream.flatTransformValues (#6424) Adds flatTrasformValues methods in KStream Adds processor supplier and processor for flatTransformValues Improves API documentation of transformValues Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/kstream/KStream.java | 225 +++++++++++++++++-- .../internals/KStreamFlatTransformValues.java | 70 ++++++ .../streams/kstream/internals/KStreamImpl.java | 39 +++- .../KStreamTransformIntegrationTest.java | 241 +++++++++++++++++---- .../internals/KStreamFlatTransformValuesTest.java | 135 ++++++++++++ .../streams/kstream/internals/KStreamImplTest.java | 34 ++- 6 files changed, 679 insertions(+), 65 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 7faba82..8375336 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -263,6 +263,8 @@ public interface KStream<K, V> { * @see #flatTransform(TransformerSupplier, String...) * @see #transformValues(ValueTransformerSupplier, String...) * @see #transformValues(ValueTransformerWithKeySupplier, String...) + * @see #flatTransformValues(ValueTransformerSupplier, String...) + * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...) */ <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); @@ -304,6 +306,8 @@ public interface KStream<K, V> { * @see #flatTransform(TransformerSupplier, String...) * @see #transformValues(ValueTransformerSupplier, String...) * @see #transformValues(ValueTransformerWithKeySupplier, String...) + * @see #flatTransformValues(ValueTransformerSupplier, String...) + * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...) */ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper); @@ -351,6 +355,8 @@ public interface KStream<K, V> { * @see #flatTransform(TransformerSupplier, String...) * @see #transformValues(ValueTransformerSupplier, String...) * @see #transformValues(ValueTransformerWithKeySupplier, String...) + * @see #flatTransformValues(ValueTransformerSupplier, String...) + * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...) */ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper); @@ -627,7 +633,7 @@ public interface KStream<K, V> { * Iterable<KeyValue> transform(K key, V value) { * // can access this.state * List<KeyValue> result = new ArrayList<>(); - * for (int i = 0; i < n; i++) { + * for (int i = 0; i < 3; i++) { * result.add(new KeyValue(key, value)); * } * return result; // emits a list of key-value pairs via return @@ -672,7 +678,7 @@ public interface KStream<K, V> { final String... stateStoreNames); /** - * Transform the value of each input record into a new value (with possible new type) of the output record. + * Transform the value of each input record into a new value (with possibly a new type) of the output record. * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input * record value and computes a new value for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. @@ -680,8 +686,8 @@ public interface KStream<K, V> { * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress * can be observed and additional periodic actions can be performed. * <p> - * In order to assign a state, the state must be created and registered beforehand (it's not required to connect - * global state stores; read-only access to global state stores is available by default): + * In order to assign a state store, the state store must be created and registered beforehand (it's not required to + * connect global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -693,12 +699,16 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState"); * }</pre> - * Within the {@link ValueTransformer}, the state is obtained via the - * {@link ProcessorContext}. + * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, * a schedule must be registered. + * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}. + * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@null}, no + * records are emitted. * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue} - * pairs should be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. + * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. + * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to + * emit a {@link KeyValue} pair. * <pre>{@code * new ValueTransformerSupplier() { * ValueTransformer get() { @@ -724,7 +734,8 @@ public interface KStream<K, V> { * } * }</pre> * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. + * If repartitioning is required, a call to {@link #through(String) through()} should be performed before + * {@code transformValues()}. * <p> * Setting a new value preserves data co-location with respect to the key. * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join) @@ -743,7 +754,7 @@ public interface KStream<K, V> { final String... stateStoreNames); /** - * Transform the value of each input record into a new value (with possible new type) of the output record. + * Transform the value of each input record into a new value (with possibly a new type) of the output record. * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to * each input record value and computes a new value for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. @@ -751,8 +762,8 @@ public interface KStream<K, V> { * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress * can be observed and additional periodic actions can be performed. * <p> - * In order to assign a state, the state must be created and registered beforehand (it's not required to connect - * global state stores; read-only access to global state stores is available by default): + * In order to assign a state store, the state store must be created and registered beforehand (it's not required to + * connect global state stores; read-only access to global state stores is available by default): * <pre>{@code * // create store * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = @@ -764,13 +775,18 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState"); * }</pre> - * Within the {@link ValueTransformerWithKey}, the state is obtained via the - * {@link ProcessorContext}. + * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, * a schedule must be registered. - * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue} - * pairs should be emitted via {@link ProcessorContext#forward(Object, Object) - * ProcessorContext.forward()}. + * The {@link ValueTransformerWithKey} must return the new value in + * {@link ValueTransformerWithKey#transform(Object, Object) transform()}. + * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()} + * is {@null}, no records are emitted. + * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and + * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs + * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. + * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries + * to emit a {@link KeyValue} pair. * <pre>{@code * new ValueTransformerWithKeySupplier() { * ValueTransformerWithKey get() { @@ -796,7 +812,8 @@ public interface KStream<K, V> { * } * }</pre> * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. + * If repartitioning is required, a call to {@link #through(String) through()} should be performed before + * {@code transformValues()}. * <p> * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * So, setting a new value preserves data co-location with respect to the key. @@ -816,6 +833,180 @@ public interface KStream<K, V> { final String... stateStoreNames); /** + * Transform the value of each input record into zero or more new values (with possibly a new + * type) and emit for each new value a record with the same key of the input record and the value. + * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input + * record value and computes zero or more new values. + * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}. + * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}). + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()} + * the processing progress can be observed and additional periodic actions can be performed. + * <p> + * In order to assign a state store, the state store must be created and registered beforehand: + * <pre>{@code + * // create store + * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = + * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), + * Serdes.String(), + * Serdes.String()); + * // register store + * builder.addStateStore(keyValueStoreBuilder); + * + * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState"); + * }</pre> + * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}. + * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, + * a schedule must be registered. + * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any + * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object) + * transform()}. + * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty + * {@link java.lang.Iterable Iterable} or {@null}, no records are emitted. + * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and + * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs + * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. + * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to + * emit a {@link KeyValue} pair. + * <pre>{@code + * new ValueTransformerSupplier() { + * ValueTransformer get() { + * return new ValueTransformer() { + * private StateStore state; + * + * void init(ProcessorContext context) { + * this.state = context.getStateStore("myValueTransformState"); + * // punctuate each second, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); + * } + * + * Iterable<NewValueType> transform(V value) { + * // can access this.state + * List<NewValueType> result = new ArrayList<>(); + * for (int i = 0; i < 3; i++) { + * result.add(new NewValueType(value)); + * } + * return result; // values + * } + * + * void close() { + * // can access this.state + * } + * } + * } + * } + * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String) through()} should be performed before + * {@code flatTransformValues()}. + * <p> + * Setting a new value preserves data co-location with respect to the key. + * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join) + * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...) + * flatTransform()}) + * + * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a + * {@link ValueTransformer} + * @param stateStoreNames the names of the state stores used by the processor + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of + * different type) + * @see #mapValues(ValueMapper) + * @see #mapValues(ValueMapperWithKey) + * @see #transform(TransformerSupplier, String...) + * @see #flatTransform(TransformerSupplier, String...) + */ + <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, + final String... stateStoreNames); + + /** + * Transform the value of each input record into zero or more new values (with possibly a new + * type) and emit for each new value a record with the same key of the input record and the value. + * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to + * each input record value and computes zero or more new values. + * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}. + * This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}). + * Furthermore, via {@link org.apache.kafka.streams.processor. Punctuator#punctuate()} the processing progress can + * be observed and additional periodic actions can be performed. + * <p> + * In order to assign a state store, the state store must be created and registered beforehand: + * <pre>{@code + * // create store + * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = + * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), + * Serdes.String(), + * Serdes.String()); + * // register store + * builder.addStateStore(keyValueStoreBuilder); + * + * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState"); + * }</pre> + * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}. + * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, + * a schedule must be registered. + * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any + * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object) + * transform()}. + * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()} + * is an empty {@link java.lang.Iterable Iterable} or {@null}, no records are emitted. + * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and + * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs + * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. + * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries + * to emit a {@link KeyValue} pair. + * <pre>{@code + * new ValueTransformerWithKeySupplier() { + * ValueTransformerWithKey get() { + * return new ValueTransformerWithKey() { + * private StateStore state; + * + * void init(ProcessorContext context) { + * this.state = context.getStateStore("myValueTransformState"); + * // punctuate each second, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); + * } + * + * Iterable<NewValueType> transform(K readOnlyKey, V value) { + * // can access this.state and use read-only key + * List<NewValueType> result = new ArrayList<>(); + * for (int i = 0; i < 3; i++) { + * result.add(new NewValueType(readOnlyKey)); + * } + * return result; // values + * } + * + * void close() { + * // can access this.state + * } + * } + * } + * } + * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String) through()} should be performed before + * {@code flatTransformValues()}. + * <p> + * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. + * So, setting a new value preserves data co-location with respect to the key. + * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join) + * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...) + * flatTransform()}) + * + * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a + * {@link ValueTransformerWithKey} + * @param stateStoreNames the names of the state stores used by the processor + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of + * different type) + * @see #mapValues(ValueMapper) + * @see #mapValues(ValueMapperWithKey) + * @see #transform(TransformerSupplier, String...) + * @see #flatTransform(TransformerSupplier, String...) + */ + <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, + final String... stateStoreNames); + + + /** * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given * {@link ProcessorSupplier}). * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java new file mode 100644 index 0000000..40e4b37 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; + +public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn> { + + private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier; + + public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerWithKeySupplier) { + this.valueTransformerSupplier = valueTransformerWithKeySupplier; + } + + @Override + public Processor<KIn, VIn> get() { + return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get()); + } + + public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> { + + private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer; + private ProcessorContext context; + + KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) { + this.valueTransformer = valueTransformer; + } + + @Override + public void init(final ProcessorContext context) { + valueTransformer.init(new ForwardingDisabledProcessorContext(context)); + this.context = context; + } + + @Override + public void process(final KIn key, final VIn value) { + final Iterable<VOut> transformedValues = valueTransformer.transform(key, value); + if (transformedValues != null) { + for (final VOut transformedValue : transformedValues) { + context.forward(key, transformedValue); + } + } + } + + @Override + public void close() { + valueTransformer.close(); + } + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 41260c5..75df058 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -472,7 +472,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @Override public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null"); + Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames); } @@ -480,7 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @Override public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null"); + Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); return doTransformValues(valueTransformerSupplier, stateStoreNames); } @@ -499,7 +499,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K builder.addGraphNode(this.streamsGraphNode, transformNode); // cannot inherit value serde - return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder); + return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, transformNode, builder); + } + + @Override + public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, + final String... stateStoreNames) { + Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); + + return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames); + } + + @Override + public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, + final String... stateStoreNames) { + Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); + + return doFlatTransformValues(valueTransformerSupplier, stateStoreNames); + } + + private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier, + final String... stateStoreNames) { + final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); + + final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>( + name, + new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name), + stateStoreNames + ); + + transformNode.setValueChangingOperation(true); + builder.addGraphNode(this.streamsGraphNode, transformNode); + + // cannot inherit value serde + return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, transformNode, builder); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java index 8af3375..fa4e33a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -53,16 +55,11 @@ public class KStreamTransformIntegrationTest { private final String topic = "stream"; private final String stateStoreName = "myTransformState"; private final List<KeyValue<Integer, Integer>> results = new ArrayList<>(); - private final ForeachAction<Integer, Integer> action = new ForeachAction<Integer, Integer>() { - @Override - public void apply(final Integer key, final Integer value) { - results.add(KeyValue.pair(key, value)); - } - }; + private final ForeachAction<Integer, Integer> action = (key, value) -> results.add(KeyValue.pair(key, value)); private KStream<Integer, Integer> stream; @Before - public void before() throws InterruptedException { + public void before() { builder = new StreamsBuilder(); final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), @@ -80,15 +77,52 @@ public class KStreamTransformIntegrationTest { driver.pipeInput(recordFactory.create(topic, Arrays.asList(new KeyValue<>(1, 1), new KeyValue<>(2, 2), new KeyValue<>(3, 3), - new KeyValue<>(1, 4), - new KeyValue<>(2, 5), - new KeyValue<>(3, 6)))); + new KeyValue<>(2, 1), + new KeyValue<>(2, 3), + new KeyValue<>(1, 3)))); } assertThat(results, equalTo(expected)); } @Test - public void shouldFlatTransform() throws Exception { + public void shouldTransform() { + stream + .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { + private KeyValueStore<Integer, Integer> state; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName); + } + + @Override + public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { + state.putIfAbsent(key, 0); + Integer storedValue = state.get(key); + final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + storedValue++); + state.put(key, storedValue); + return result; + } + + @Override + public void close() { + } + }, "myTransformState") + .foreach(action); + + final List<KeyValue<Integer, Integer>> expected = Arrays.asList( + KeyValue.pair(2, 1), + KeyValue.pair(3, 2), + KeyValue.pair(4, 3), + KeyValue.pair(3, 2), + KeyValue.pair(3, 5), + KeyValue.pair(2, 4)); + verifyResult(expected); + } + + @Test + public void shouldFlatTransform() { stream .flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() { private KeyValueStore<Integer, Integer> state; @@ -103,12 +137,11 @@ public class KStreamTransformIntegrationTest { public Iterable<KeyValue<Integer, Integer>> transform(final Integer key, final Integer value) { final List<KeyValue<Integer, Integer>> result = new ArrayList<>(); state.putIfAbsent(key, 0); - final Integer storedValue = state.get(key); - int outputValue = storedValue.intValue(); + Integer storedValue = state.get(key); for (int i = 0; i < 3; i++) { - result.add(new KeyValue<Integer, Integer>(key + i, value + outputValue++)); + result.add(new KeyValue<>(key + i, value + storedValue++)); } - state.put(key, new Integer(outputValue)); + state.put(key, storedValue); return result; } @@ -128,37 +161,160 @@ public class KStreamTransformIntegrationTest { KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(5, 5), - KeyValue.pair(1, 7), - KeyValue.pair(2, 8), - KeyValue.pair(3, 9), - KeyValue.pair(2, 8), - KeyValue.pair(3, 9), - KeyValue.pair(4, 10), - KeyValue.pair(3, 9), - KeyValue.pair(4, 10), - KeyValue.pair(5, 11)); + KeyValue.pair(2, 4), + KeyValue.pair(3, 5), + KeyValue.pair(4, 6), + KeyValue.pair(2, 9), + KeyValue.pair(3, 10), + KeyValue.pair(4, 11), + KeyValue.pair(1, 6), + KeyValue.pair(2, 7), + KeyValue.pair(3, 8)); verifyResult(expected); } @Test - public void shouldTransform() throws Exception { + public void shouldTransformValuesWithValueTransformerWithKey() { stream - .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { + .transformValues(() -> new ValueTransformerWithKey<Integer, Integer, Integer>() { private KeyValueStore<Integer, Integer> state; - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { - state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName); + state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState"); } @Override - public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { + public Integer transform(final Integer key, final Integer value) { + state.putIfAbsent(key, 0); + Integer storedValue = state.get(key); + final Integer result = value + storedValue++; + state.put(key, storedValue); + return result; + } + + @Override + public void close() { + } + }, "myTransformState") + .foreach(action); + + final List<KeyValue<Integer, Integer>> expected = Arrays.asList( + KeyValue.pair(1, 1), + KeyValue.pair(2, 2), + KeyValue.pair(3, 3), + KeyValue.pair(2, 2), + KeyValue.pair(2, 5), + KeyValue.pair(1, 4)); + verifyResult(expected); + } + + @Test + public void shouldTransformValuesWithValueTransformerWithoutKey() { + stream + .transformValues(() -> new ValueTransformer<Integer, Integer>() { + private KeyValueStore<Integer, Integer> state; + + @Override + public void init(final ProcessorContext context) { + state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState"); + } + + @Override + public Integer transform(final Integer value) { + state.putIfAbsent(value, 0); + Integer counter = state.get(value); + state.put(value, ++counter); + return counter; + } + + @Override + public void close() { + } + }, "myTransformState") + .foreach(action); + + final List<KeyValue<Integer, Integer>> expected = Arrays.asList( + KeyValue.pair(1, 1), + KeyValue.pair(2, 1), + KeyValue.pair(3, 1), + KeyValue.pair(2, 2), + KeyValue.pair(2, 2), + KeyValue.pair(1, 3)); + verifyResult(expected); + } + + @Test + public void shouldFlatTransformValuesWithKey() { + stream + .flatTransformValues(() -> new ValueTransformerWithKey<Integer, Integer, Iterable<Integer>>() { + private KeyValueStore<Integer, Integer> state; + + @Override + public void init(final ProcessorContext context) { + state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState"); + } + + @Override + public Iterable<Integer> transform(final Integer key, final Integer value) { + final List<Integer> result = new ArrayList<>(); state.putIfAbsent(key, 0); - final Integer storedValue = state.get(key); - int outputValue = storedValue.intValue(); - final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + outputValue++); - state.put(key, outputValue); + Integer storedValue = state.get(key); + for (int i = 0; i < 3; i++) { + result.add(value + storedValue++); + } + state.put(key, storedValue); + return result; + } + + @Override + public void close() { + } + }, "myTransformState") + .foreach(action); + + final List<KeyValue<Integer, Integer>> expected = Arrays.asList( + KeyValue.pair(1, 1), + KeyValue.pair(1, 2), + KeyValue.pair(1, 3), + KeyValue.pair(2, 2), + KeyValue.pair(2, 3), + KeyValue.pair(2, 4), + KeyValue.pair(3, 3), + KeyValue.pair(3, 4), + KeyValue.pair(3, 5), + KeyValue.pair(2, 4), + KeyValue.pair(2, 5), + KeyValue.pair(2, 6), + KeyValue.pair(2, 9), + KeyValue.pair(2, 10), + KeyValue.pair(2, 11), + KeyValue.pair(1, 6), + KeyValue.pair(1, 7), + KeyValue.pair(1, 8)); + verifyResult(expected); + } + + @Test + public void shouldFlatTransformValuesWithValueTransformerWithoutKey() { + stream + .flatTransformValues(() -> new ValueTransformer<Integer, Iterable<Integer>>() { + private KeyValueStore<Integer, Integer> state; + + @Override + public void init(final ProcessorContext context) { + state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState"); + } + + @Override + public Iterable<Integer> transform(final Integer value) { + final List<Integer> result = new ArrayList<>(); + state.putIfAbsent(value, 0); + Integer counter = state.get(value); + for (int i = 0; i < 3; i++) { + result.add(++counter); + } + state.put(value, counter); return result; } @@ -169,13 +325,24 @@ public class KStreamTransformIntegrationTest { .foreach(action); final List<KeyValue<Integer, Integer>> expected = Arrays.asList( + KeyValue.pair(1, 1), + KeyValue.pair(1, 2), + KeyValue.pair(1, 3), KeyValue.pair(2, 1), + KeyValue.pair(2, 2), + KeyValue.pair(2, 3), + KeyValue.pair(3, 1), KeyValue.pair(3, 2), - KeyValue.pair(4, 3), + KeyValue.pair(3, 3), + KeyValue.pair(2, 4), + KeyValue.pair(2, 5), + KeyValue.pair(2, 6), + KeyValue.pair(2, 4), KeyValue.pair(2, 5), - KeyValue.pair(3, 6), - KeyValue.pair(4, 7)); + KeyValue.pair(2, 6), + KeyValue.pair(1, 7), + KeyValue.pair(1, 8), + KeyValue.pair(1, 9)); verifyResult(expected); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java new file mode 100644 index 0000000..36167c0 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; + +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.junit.Before; +import org.junit.Test; + +public class KStreamFlatTransformValuesTest extends EasyMockSupport { + + private Integer inputKey; + private Integer inputValue; + + private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer; + private ProcessorContext context; + + private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor; + + @Before + public void setUp() { + inputKey = 1; + inputValue = 10; + valueTransformer = mock(ValueTransformerWithKey.class); + context = strictMock(ProcessorContext.class); + processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer); + } + + @Test + public void shouldInitializeFlatTransformValuesProcessor() { + valueTransformer.init(EasyMock.isA(ForwardingDisabledProcessorContext.class)); + replayAll(); + + processor.init(context); + + verifyAll(); + } + + @Test + public void shouldTransformInputRecordToMultipleOutputValues() { + final Iterable<String> outputValues = Arrays.asList( + "Hello", + "Blue", + "Planet"); + processor.init(context); + EasyMock.reset(valueTransformer); + + EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues); + for (final String outputValue : outputValues) { + context.forward(inputKey, outputValue); + } + replayAll(); + + processor.process(inputKey, inputValue); + + verifyAll(); + } + + @Test + public void shouldEmitNoRecordIfTransformReturnsEmptyList() { + processor.init(context); + EasyMock.reset(valueTransformer); + + EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.<String>emptyList()); + replayAll(); + + processor.process(inputKey, inputValue); + + verifyAll(); + } + + @Test + public void shouldEmitNoRecordIfTransformReturnsNull() { + processor.init(context); + EasyMock.reset(valueTransformer); + + EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null); + replayAll(); + + processor.process(inputKey, inputValue); + + verifyAll(); + } + + @Test + public void shouldCloseFlatTransformValuesProcessor() { + valueTransformer.close(); + replayAll(); + + processor.close(); + + verifyAll(); + } + + @Test + public void shouldGetFlatTransformValuesProcessor() { + final ValueTransformerWithKeySupplier<Integer, Integer, Iterable<String>> valueTransformerSupplier = + mock(ValueTransformerWithKeySupplier.class); + final KStreamFlatTransformValues<Integer, Integer, String> processorSupplier = + new KStreamFlatTransformValues<>(valueTransformerSupplier); + + EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer); + replayAll(); + + final Processor<Integer, Integer> processor = processorSupplier.get(); + + verifyAll(); + assertTrue(processor instanceof KStreamFlatTransformValuesProcessor); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 3b450b1..5a1b579 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -486,25 +486,43 @@ public class KStreamImplTest { } @Test - public void shouldNotAllowNullTransformSupplierOnTransform() { + public void shouldNotAllowNullTransformerSupplierOnTransform() { final Exception e = assertThrows(NullPointerException.class, () -> testStream.transform(null)); assertEquals("transformerSupplier can't be null", e.getMessage()); } @Test - public void shouldNotAllowNullTransformSupplierOnFlatTransform() { + public void shouldNotAllowNullTransformerSupplierOnFlatTransform() { final Exception e = assertThrows(NullPointerException.class, () -> testStream.flatTransform(null)); assertEquals("transformerSupplier can't be null", e.getMessage()); } - @Test(expected = NullPointerException.class) - public void shouldNotAllowNullTransformSupplierOnTransformValues() { - testStream.transformValues((ValueTransformerSupplier) null); + @Test + public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() { + final Exception e = + assertThrows(NullPointerException.class, () -> testStream.transformValues((ValueTransformerWithKeySupplier) null)); + assertEquals("valueTransformerSupplier can't be null", e.getMessage()); } - @Test(expected = NullPointerException.class) - public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() { - testStream.transformValues((ValueTransformerWithKeySupplier) null); + @Test + public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() { + final Exception e = + assertThrows(NullPointerException.class, () -> testStream.transformValues((ValueTransformerSupplier) null)); + assertEquals("valueTransformerSupplier can't be null", e.getMessage()); + } + + @Test + public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() { + final Exception e = + assertThrows(NullPointerException.class, () -> testStream.flatTransformValues((ValueTransformerWithKeySupplier) null)); + assertEquals("valueTransformerSupplier can't be null", e.getMessage()); + } + + @Test + public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() { + final Exception e = + assertThrows(NullPointerException.class, () -> testStream.flatTransformValues((ValueTransformerSupplier) null)); + assertEquals("valueTransformerSupplier can't be null", e.getMessage()); } @Test(expected = NullPointerException.class)