fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657657



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
##########
@@ -90,14 +90,19 @@ public void process(final K key, final Change<V> change) {
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and 
aggregate
-            if (oldPair != null && oldPair.key != null && oldPair.value != 
null) {
-                context().forward(oldPair.key, new Change<>(null, 
oldPair.value));
+            final boolean oldPairNotNull = oldPair != null && oldPair.key != 
null && oldPair.value != null;
+            final boolean newPairNotNull = newPair != null && newPair.key != 
null && newPair.value != null;
+            if (oldPairNotNull && newPairNotNull && oldPair.key == 
newPair.key) {

Review comment:
       As noted by Matthias on the mailing list thread, this fix depends on a 
correct implementation of `.equals()` method for the key type. Would we need to 
document this assumption somewhere for users?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to