This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new faf36357f3c MINOR: improve logging for FK-join (#14105)
faf36357f3c is described below

commit faf36357f3c87888087627d1d2220d6db29d98fb
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Aug 4 21:06:53 2023 -0700

    MINOR: improve logging for FK-join (#14105)
    
    Reviewers: Colt McNealy <[email protected]>, Walker Carlson 
<[email protected]>
---
 .../internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java      | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 2fffa89b348..600b28078b9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -29,6 +29,8 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.function.Supplier;
 
@@ -43,6 +45,7 @@ import java.util.function.Supplier;
  * @param <VR> Type of joined result of primary and foreign values
  */
 public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements 
ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class);
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
     private final Serializer<V> constructionTimeValueSerializer;
     private final Supplier<String> valueHashSerdePseudoTopicSupplier;
@@ -107,6 +110,8 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> 
implements ProcessorSup
                         result = joiner.apply(currentValueWithTimestamp == 
null ? null : currentValueWithTimestamp.value(), 
record.value().getForeignValue());
                     }
                     context().forward(record.withValue(result));
+                } else {
+                    LOG.trace("Dropping FK-join response due to hash mismatch. 
Expected {}. Actual {}", messageHash, currentHash);
                 }
             }
         };

Reply via email to