mjsax commented on code in PR #19303:
URL: https://github.com/apache/kafka/pull/19303#discussion_r2026203506
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java:
##########
@@ -316,6 +367,21 @@ public void innerJoinShouldPropagateDeletionOfPrimaryKey()
{
);
}
+ @Test
+ public void
innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
+ final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
+ innerJoinProcessor.init(context);
+ context.setRecordMetadata("topic", 0, 0);
+
+ innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new
LeftValue(null)), 0));
+
+ assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
Review Comment:
Wondering if we should really record this case as dropped value? 🤔
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -143,28 +143,25 @@ private void leftJoinInstructions(final Record<KLeft,
Change<VLeft>> record) {
private void defaultJoinInstructions(final Record<KLeft,
Change<VLeft>> record) {
if (record.value().oldValue != null) {
- final KRight oldForeignKey = record.value().oldValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
- if (oldForeignKey == null) {
+ final KRight oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
Review Comment:
Why did you remove the `record.value().oldValue == null` check? Seems we
would need this check? If the old value is `null` it mean the row did not
exist, and the new record is an insert. Thus, there was also no "old FK" that
we would need to extract by calling the `foreignKeyExtractor` but we can set
`oldKF = null`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]