This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ff78c68 KAFKA-7502: Cleanup KTable materialization logic in a single place (doMapValues) (#6520) ff78c68 is described below commit ff78c684ff22d81174cd789a4ac7e7e1fe4dfc8a Author: Lee Dongjin <dong...@apache.org> AuthorDate: Sat Mar 30 06:10:04 2019 +0900 KAFKA-7502: Cleanup KTable materialization logic in a single place (doMapValues) (#6520) * Move materialization logic from TableProcessorNode to KTableImpl 1. TableProcessorNode: remove materializedInternal, use storeBuilder instead. 2. Instantiate StoreBuilder in KTableImpl#[doFilter, doMapValues, doTransformValues], instead of TableProcessorNode#init. * Cleanup KTableImpl#doMapValues * 1. Add TableProcessorNode(String, ProcessorParameters, StoreBuilder). 2. Reformat+trivial changes on TableProcessorNode.java. --- .../streams/kstream/internals/KTableImpl.java | 32 ++++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index e9291cc..6a1af18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -201,17 +201,31 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) { - // we actually do not need generate store names at all since if it is not specified, we will not - // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. - if (materializedInternal != null && materializedInternal.storeName() == null) { - builder.newStoreName(MAPVALUES_NAME); + final Serde<K> keySerde; + final Serde<VR> valueSerde; + final String queryableStoreName; + final StoreBuilder<KeyValueStore<K, VR>> storeBuilder; + + if (materializedInternal != null) { + // we actually do not need to generate store names at all since if it is not specified, we will not + // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. + if (materializedInternal.storeName() == null) { + builder.newStoreName(MAPVALUES_NAME); + } + keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde; + valueSerde = materializedInternal.valueSerde(); + queryableStoreName = materializedInternal.queryableStoreName(); + // only materialize if materialized is specified and it has queryable name + storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + } else { + keySerde = this.keySerde; + valueSerde = null; + queryableStoreName = null; + storeBuilder = null; } final String name = builder.newProcessorName(MAPVALUES_NAME); - // only materialize if the state store has queryable name - final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; - final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName); // leaving in calls to ITB until building topology with graph @@ -232,8 +246,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< // we preserve the value following the order of 1) materialized, 2) null return new KTableImpl<>( name, - materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, - materializedInternal != null ? materializedInternal.valueSerde() : null, + keySerde, + valueSerde, sourceNodes, queryableStoreName, processorSupplier,