AyoubOm commented on code in PR #19303:
URL: https://github.com/apache/kafka/pull/19303#discussion_r2029469412
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -143,28 +143,25 @@ private void leftJoinInstructions(final Record<KLeft,
Change<VLeft>> record) {
private void defaultJoinInstructions(final Record<KLeft,
Change<VLeft>> record) {
if (record.value().oldValue != null) {
- final KRight oldForeignKey = record.value().oldValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
- if (oldForeignKey == null) {
+ final KRight oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
Review Comment:
The check was unnecessary as this is inside the if block `if
(record.value().oldValue != null)`
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java:
##########
@@ -316,6 +367,21 @@ public void innerJoinShouldPropagateDeletionOfPrimaryKey()
{
);
}
+ @Test
+ public void
innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
+ final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
+ innerJoinProcessor.init(context);
+ context.setRecordMetadata("topic", 0, 0);
+
+ innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new
LeftValue(null)), 0));
+
+ assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
Review Comment:
I think the definition of dropped record here is those who did not impact
the join result (no output record and no removal of subscription). I believe it
would be useful to monitor this info, to keep track of records with no effect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]