This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d10023e Cleanup KTableImpl#doTransformValues (#6519) d10023e is described below commit d10023e8d3093eaa6b0b3afa04c7ea1b9bee6c30 Author: Lee Dongjin <dong...@apache.org> AuthorDate: Sat Mar 30 06:08:20 2019 +0900 Cleanup KTableImpl#doTransformValues (#6519) This PR is a follow-up of #6174 and #6453, which cleans up KTableImpl#doTransformValues method. Reviewers: Bill Bejeck <bbej...@gmail.com> --- .../streams/kstream/internals/KTableImpl.java | 31 +++++++++++++++------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 67b3c31..e9291cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -295,12 +295,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, final String... stateStoreNames) { Objects.requireNonNull(stateStoreNames, "stateStoreNames"); + final Serde<K> keySerde; + final Serde<VR> valueSerde; + final String queryableStoreName; + final StoreBuilder<KeyValueStore<K, VR>> storeBuilder; - final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); + if (materializedInternal != null) { + // don't inherit parent value serde, since this operation may change the value type, more specifically: + // we preserve the key following the order of 1) materialized, 2) parent, 3) null + keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde; + // we preserve the value following the order of 1) materialized, 2) null + valueSerde = materializedInternal.valueSerde(); + queryableStoreName = materializedInternal.queryableStoreName(); + // only materialize if materialized is specified and it has queryable name + storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + } else { + keySerde = this.keySerde; + valueSerde = null; + queryableStoreName = null; + storeBuilder = null; + } - // only materialize if users provide a specific queryable name - final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; - final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>( this, @@ -320,13 +336,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< builder.addGraphNode(this.streamsGraphNode, tableNode); - // don't inherit parent value serde, since this operation may change the value type, more specifically: - // we preserve the key following the order of 1) materialized, 2) parent, 3) null - // we preserve the value following the order of 1) materialized, 2) null return new KTableImpl<>( name, - materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, - materializedInternal != null ? materializedInternal.valueSerde() : null, + keySerde, + valueSerde, sourceNodes, queryableStoreName, processorSupplier,