This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff78c68  KAFKA-7502: Cleanup KTable materialization logic in a single 
place (doMapValues) (#6520)
ff78c68 is described below

commit ff78c684ff22d81174cd789a4ac7e7e1fe4dfc8a
Author: Lee Dongjin <dong...@apache.org>
AuthorDate: Sat Mar 30 06:10:04 2019 +0900

    KAFKA-7502: Cleanup KTable materialization logic in a single place 
(doMapValues) (#6520)
    
    * Move materialization logic from TableProcessorNode to KTableImpl
    
    1. TableProcessorNode: remove materializedInternal, use storeBuilder 
instead.
    2. Instantiate StoreBuilder in KTableImpl#[doFilter, doMapValues, 
doTransformValues], instead of TableProcessorNode#init.
    
    * Cleanup KTableImpl#doMapValues
    
    * 1. Add TableProcessorNode(String, ProcessorParameters, StoreBuilder). 2. 
Reformat+trivial changes on TableProcessorNode.java.
---
 .../streams/kstream/internals/KTableImpl.java      | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index e9291cc..6a1af18 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -201,17 +201,31 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
     private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, 
? super V, ? extends VR> mapper,
                                            final MaterializedInternal<K, VR, 
KeyValueStore<Bytes, byte[]>> materializedInternal) {
-        // we actually do not need generate store names at all since if it is 
not specified, we will not
-        // materialize the store; but we still need to burn one index BEFORE 
generating the processor to keep compatibility.
-        if (materializedInternal != null && materializedInternal.storeName() 
== null) {
-            builder.newStoreName(MAPVALUES_NAME);
+        final Serde<K> keySerde;
+        final Serde<VR> valueSerde;
+        final String queryableStoreName;
+        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+
+        if (materializedInternal != null) {
+            // we actually do not need to generate store names at all since if 
it is not specified, we will not
+            // materialize the store; but we still need to burn one index 
BEFORE generating the processor to keep compatibility.
+            if (materializedInternal.storeName() == null) {
+                builder.newStoreName(MAPVALUES_NAME);
+            }
+            keySerde = materializedInternal.keySerde() != null ? 
materializedInternal.keySerde() : this.keySerde;
+            valueSerde = materializedInternal.valueSerde();
+            queryableStoreName = materializedInternal.queryableStoreName();
+            // only materialize if materialized is specified and it has 
queryable name
+            storeBuilder = queryableStoreName != null ? (new 
KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+        } else {
+            keySerde = this.keySerde;
+            valueSerde = null;
+            queryableStoreName = null;
+            storeBuilder = null;
         }
 
         final String name = builder.newProcessorName(MAPVALUES_NAME);
 
-        // only materialize if the state store has queryable name
-        final String queryableStoreName = materializedInternal != null ? 
materializedInternal.queryableStoreName() : null;
-        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = 
queryableStoreName != null ? (new 
KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new 
KTableMapValues<>(this, mapper, queryableStoreName);
 
         // leaving in calls to ITB until building topology with graph
@@ -232,8 +246,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         // we preserve the value following the order of 1) materialized, 2) 
null
         return new KTableImpl<>(
             name,
-            materializedInternal != null && materializedInternal.keySerde() != 
null ? materializedInternal.keySerde() : keySerde,
-            materializedInternal != null ? materializedInternal.valueSerde() : 
null,
+            keySerde,
+            valueSerde,
             sourceNodes,
             queryableStoreName,
             processorSupplier,

Reply via email to