mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would 
need to pass in `true` to force a materialization). My point is, shouldn't we 
pass in a constant? For KTableKTableAbstractJoin we always want that the 
upstream is sending us old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that 
was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would 
need to pass in `true` to force a materialization). My point is, shouldn't we 
pass in a constant? For KTableKTableAbstractJoin we always want that the 
upstream is sending us old values.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that 
was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to