This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d10023e  Cleanup KTableImpl#doTransformValues (#6519)
d10023e is described below

commit d10023e8d3093eaa6b0b3afa04c7ea1b9bee6c30
Author: Lee Dongjin <dong...@apache.org>
AuthorDate: Sat Mar 30 06:08:20 2019 +0900

    Cleanup KTableImpl#doTransformValues (#6519)
    
    This PR is a follow-up of #6174 and #6453, which cleans up 
KTableImpl#doTransformValues method.
    
    Reviewers: Bill Bejeck <bbej...@gmail.com>
---
 .../streams/kstream/internals/KTableImpl.java      | 31 +++++++++++++++-------
 1 file changed, 22 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 67b3c31..e9291cc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -295,12 +295,28 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
                                                  final MaterializedInternal<K, 
VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
                                                  final String... 
stateStoreNames) {
         Objects.requireNonNull(stateStoreNames, "stateStoreNames");
+        final Serde<K> keySerde;
+        final Serde<VR> valueSerde;
+        final String queryableStoreName;
+        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
 
-        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+        if (materializedInternal != null) {
+            // don't inherit parent value serde, since this operation may 
change the value type, more specifically:
+            // we preserve the key following the order of 1) materialized, 2) 
parent, 3) null
+            keySerde = materializedInternal.keySerde() != null ? 
materializedInternal.keySerde() : this.keySerde;
+            // we preserve the value following the order of 1) materialized, 
2) null
+            valueSerde = materializedInternal.valueSerde();
+            queryableStoreName = materializedInternal.queryableStoreName();
+            // only materialize if materialized is specified and it has 
queryable name
+            storeBuilder = queryableStoreName != null ? (new 
KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+        } else {
+            keySerde = this.keySerde;
+            valueSerde = null;
+            queryableStoreName = null;
+            storeBuilder = null;
+        }
 
-        // only materialize if users provide a specific queryable name
-        final String queryableStoreName = materializedInternal != null ? 
materializedInternal.queryableStoreName() : null;
-        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = 
queryableStoreName != null ? (new 
KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new 
KTableTransformValues<>(
             this,
@@ -320,13 +336,10 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
-        // don't inherit parent value serde, since this operation may change 
the value type, more specifically:
-        // we preserve the key following the order of 1) materialized, 2) 
parent, 3) null
-        // we preserve the value following the order of 1) materialized, 2) 
null
         return new KTableImpl<>(
             name,
-            materializedInternal != null && materializedInternal.keySerde() != 
null ? materializedInternal.keySerde() : keySerde,
-            materializedInternal != null ? materializedInternal.valueSerde() : 
null,
+            keySerde,
+            valueSerde,
             sourceNodes,
             queryableStoreName,
             processorSupplier,

Reply via email to