[ 
https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714158#comment-16714158
 ] 

ASF GitHub Bot commented on KAFKA-6036:
---------------------------------------

guozhangwang closed pull request #6017: KAFKA-6036: Follow-up; cleanup 
sendOldValues logic ForwardingCacheFlushListener 
URL: https://github.com/apache/kafka/pull/6017
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
index f30ab79df9c..4065ceda61b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -22,13 +22,11 @@
 
 class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
     private final InternalProcessorContext context;
-    private final boolean sendOldValues;
     private final ProcessorNode myNode;
 
-    ForwardingCacheFlushListener(final ProcessorContext context, final boolean 
sendOldValues) {
+    ForwardingCacheFlushListener(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
         myNode = this.context.currentNode();
-        this.sendOldValues = sendOldValues;
     }
 
     @Override
@@ -36,11 +34,7 @@ public void apply(final K key, final V newValue, final V 
oldValue) {
         final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            if (sendOldValues) {
-                context.forward(key, new Change<>(newValue, oldValue));
-            } else {
-                context.forward(key, new Change<>(newValue, null));
-            }
+            context.forward(key, new Change<>(newValue, oldValue));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 1b3a8f44e31..648c50b7e50 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -54,8 +54,8 @@ public void enableSendingOldValues() {
     private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, T> store;
-        private TupleForwarder<K, T> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private TupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -63,7 +63,7 @@ public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
 
@@ -92,7 +92,7 @@ public void process(final K key, final V value) {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 9f404ea55f6..09e4fab48e5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -61,7 +61,7 @@ public void init(final ProcessorContext context) {
             metrics = (StreamsMetricsImpl) context.metrics();
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
 
@@ -89,7 +89,7 @@ public void process(final K key, final V value) {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index b89399bf7b7..13f4a6eb409 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -92,7 +92,7 @@ public void init(final ProcessorContext context) {
             lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
 
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
         @Override
@@ -135,7 +135,7 @@ public void process(final K key, final V value) {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<K>, Agg> session : merged) {
                         store.remove(session.key);
-                        tupleForwarder.maybeForward(session.key, null, 
session.value);
+                        tupleForwarder.maybeForward(session.key, null, 
sendOldValues ? session.value : null);
                     }
                 }
 
@@ -151,10 +151,8 @@ public void process(final K key, final V value) {
                 lateRecordDropSensor.record();
             }
         }
-
     }
 
-
     private SessionWindow mergeSessionWindow(final SessionWindow one, final 
SessionWindow two) {
         final long start = one.start() < two.start() ? one.start() : 
two.start();
         final long end = one.end() > two.end() ? one.end() : two.end();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f29251573e2..0edbe4e4567 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -87,7 +87,7 @@ public void init(final ProcessorContext context) {
             lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
 
             windowStore = (WindowStore<K, Agg>) 
context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new 
ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), 
sendOldValues);
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new 
ForwardingCacheFlushListener<Windowed<K>, V>(context), sendOldValues);
         }
 
         @Override
@@ -122,7 +122,7 @@ public void process(final K key, final V value) {
 
                     // update the store with the new value
                     windowStore.put(key, newAgg, windowStart);
-                    tupleForwarder.maybeForward(new Windowed<>(key, 
entry.getValue()), newAgg, oldAgg);
+                    tupleForwarder.maybeForward(new Windowed<>(key, 
entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
                 } else {
                     log.debug(
                         "Skipping record for expired window. key=[{}] 
topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) 
expiration=[{}]",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index b60f9ab1f73..b04a729a447 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -62,7 +62,7 @@ public void enableSendingOldValues() {
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
         /**
@@ -95,7 +95,7 @@ public void process(final K key, final Change<V> value) {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
 
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 75fba9926d9..d1e524c88af 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -70,7 +70,7 @@ public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 2ed70bd46a2..78c1dc6f480 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -89,7 +89,7 @@ public void init(final ProcessorContext context) {
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
                 tupleForwarder = new TupleForwarder<>(store, context,
-                    new ForwardingCacheFlushListener<K, V>(context, 
sendOldValues),
+                    new ForwardingCacheFlushListener<K, V>(context),
                     sendOldValues);
             }
         }
@@ -98,9 +98,13 @@ public void init(final ProcessorContext context) {
         public void process(final K key, final Change<V> value) {
             if (queryableName != null) {
                 store.put(key, value.newValue);
-                tupleForwarder.maybeForward(key, value.newValue, 
value.oldValue);
+                tupleForwarder.maybeForward(key, value.newValue, sendOldValues 
? value.oldValue : null);
             } else {
-                context().forward(key, value);
+                if (sendOldValues) {
+                    context().forward(key, value);
+                } else {
+                    context().forward(key, new Change<>(value.newValue, null));
+                }
             }
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c2c84d5d362..2317947f5f2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -90,7 +90,7 @@ public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V1>) 
context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V1>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V1>(context), sendOldValues);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 069b3604631..38c5a11f34d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -57,7 +57,7 @@ public void enableSendingOldValues() {
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
         /**
@@ -89,7 +89,7 @@ public void process(final K key, final Change<V> value) {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 274d96ea40c..6fc57bcf582 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -77,7 +77,7 @@ public void init(final ProcessorContext context) {
             metrics = (StreamsMetricsImpl) context.metrics();
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index b3e84d79119..88cea4fc7c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -91,7 +91,7 @@ public void init(final ProcessorContext context) {
             valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
 
             if (queryableName != null) {
-                final ForwardingCacheFlushListener<K, V1> flushListener = new 
ForwardingCacheFlushListener<>(context, sendOldValues);
+                final ForwardingCacheFlushListener<K, V1> flushListener = new 
ForwardingCacheFlushListener<>(context);
                 store = (KeyValueStore<K, V1>) 
context.getStateStore(queryableName);
                 tupleForwarder = new TupleForwarder<>(store, context, 
flushListener, sendOldValues);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index ff3ef44894b..aec0d16ccef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -32,7 +32,6 @@
 class TupleForwarder<K, V> {
     private final CachedStateStore cachedStateStore;
     private final ProcessorContext context;
-    private final boolean sendOldValues;
 
     @SuppressWarnings("unchecked")
     TupleForwarder(final StateStore store,
@@ -41,7 +40,6 @@
                    final boolean sendOldValues) {
         this.cachedStateStore = cachedStateStore(store);
         this.context = context;
-        this.sendOldValues = sendOldValues;
         if (this.cachedStateStore != null) {
             cachedStateStore.setFlushListener(flushListener, sendOldValues);
         }
@@ -61,11 +59,7 @@ public void maybeForward(final K key,
                              final V newValue,
                              final V oldValue) {
         if (cachedStateStore == null) {
-            if (sendOldValues) {
-                context.forward(key, new Change<>(newValue, oldValue));
-            } else {
-                context.forward(key, new Change<>(newValue, null));
-            }
+            context.forward(key, new Change<>(newValue, oldValue));
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable logical materialization to physical materialization
> ----------------------------------------------------------
>
>                 Key: KAFKA-6036
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6036
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> Today whenever users specify a queryable store name for KTable, we would 
> always add a physical state store in the translated processor topology.
> For some scenarios, we should consider not physically materialize the KTable 
> but only "logically" materialize it when you have some simple transformation 
> operations or even join operations that generated new KTables, and which 
> needs to be materialized with a state store, you can use the changelog topic 
> of the previous KTable and applies the transformation logic upon restoration 
> instead of creating a new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized 
> for joining
> {code}
> We can actually set the {{getter}} function of table2's materialized store, 
> say {{state2}} to be reading from {{topic1}} and then apply the filter 
> operator, instead of creating a new {{state2-changelog}} topic in this case.
> We can come up with a general internal impl optimizations to determine when 
> to logically materialize, and whether we should actually allow users of DSL 
> to "hint" whether to materialize or not (it then may need a KIP).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to