This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new faf36357f3c MINOR: improve logging for FK-join (#14105)
faf36357f3c is described below
commit faf36357f3c87888087627d1d2220d6db29d98fb
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Aug 4 21:06:53 2023 -0700
MINOR: improve logging for FK-join (#14105)
Reviewers: Colt McNealy <[email protected]>, Walker Carlson
<[email protected]>
---
.../internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 2fffa89b348..600b28078b9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -29,6 +29,8 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.function.Supplier;
@@ -43,6 +45,7 @@ import java.util.function.Supplier;
* @param <VR> Type of joined result of primary and foreign values
*/
public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements
ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class);
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
@@ -107,6 +110,8 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR>
implements ProcessorSup
result = joiner.apply(currentValueWithTimestamp ==
null ? null : currentValueWithTimestamp.value(),
record.value().getForeignValue());
}
context().forward(record.withValue(result));
+ } else {
+ LOG.trace("Dropping FK-join response due to hash mismatch.
Expected {}. Actual {}", messageHash, currentHash);
}
}
};