vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1166087199


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java:
##########
@@ -42,12 +48,13 @@ public boolean equals(final Object o) {
             return false;
         }
         final Change<?> change = (Change<?>) o;
-        return Objects.equals(newValue, change.newValue) &&
-                Objects.equals(oldValue, change.oldValue);
+        return Objects.equals(newValue, change.newValue)
+            && Objects.equals(oldValue, change.oldValue)
+            && isLatest == change.isLatest;

Review Comment:
   I'm on the fence about updating the equals method here. I'd like for it to 
be updated in this way but because serialization does not depend on this new 
boolean, it could be the case that round-trip serialization causes equals() to 
no longer evaluate to true, which seems confusing. 
   
   Maybe it doesn't matter if the equals method isn't called anywhere (I 
couldn't find any usages). Curious to hear reviewer opinions.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##########
@@ -116,10 +118,13 @@ public void process(final Record<KIn, Change<VIn>> 
record) {
             }
 
             // update the store with the new value
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+            // if not put to store, do not forward downstream either
+            if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {

Review Comment:
   Some processors, including this one, guarantee that their timestamps are in 
non-decreasing order (per key), which means `putReturnCode` will always equal 
`PUT_RETURN_CODE_IS_LATEST`, so having these extra comparisons here is 
technically unnecessary, but it seems nice to have them anyway for consistency 
with other processors (and in case this guarantee ever changes, though it's 
unlikely). Happy to remove the redundancy if that's preferable, though.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java:
##########
@@ -22,10 +22,16 @@
 
     public final T newValue;
     public final T oldValue;
+    public final boolean isLatest;
 
     public Change(final T newValue, final T oldValue) {
+        this(newValue, oldValue, true);

Review Comment:
   This two-arg constructor is currently still being called from places such as 
the windowed aggregate processors and caching state stores, since versioned 
tables do not come into play here and therefore all Changes are considered the 
latest. If we think it'd be better to encode true in these places and remove 
the two-arg constructor, I can do this cleanup in a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to