mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487167789



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to