[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170899662


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Sure thing. Here ya go: https://github.com/apache/kafka/pull/13610



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Sure thing: https://github.com/apache/kafka/pull/13610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170899662


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Here you go: https://github.com/apache/kafka/pull/13610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170894940


##
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##
@@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws 
Exception {
 null,
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d", null,  15L)),
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-d", null,  14L)),
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-d", null,  14L))
+null,
+null,
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-e", null,  15L)),
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
null, null,  14L)),
+null,
+null,
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-e", null,  14L))

Review Comment:
   Yes, you are correct. I've fixed this and also strengthened the check that 
the test uses (in the latest commit) so that this type of "error" fails the 
test now. This required updating some of the table-table multi-join expected 
test results in the process, separate from versioned joins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170894062


##
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##
@@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() {
 null,
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L)),
+null,
+null,
+null,
+null,
+null,

Review Comment:
   Yep, good eye. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170869917


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java:
##
@@ -97,6 +95,27 @@ public KTableKTableJoinMerger joinMerger() {
 return (KTableKTableJoinMerger) kChangeProcessorSupplier;
 }
 
+@Override
+public void enableVersionedSemantics(final boolean useVersionedSemantics, 
final String parentNodeName) {
+enableVersionedSemantics(thisProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+enableVersionedSemantics(otherProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+}
+
+@SuppressWarnings("unchecked")
+private void enableVersionedSemantics(final ProcessorParameters processorParameters,
+  final boolean useVersionedSemantics,
+  final String parentNodeName) {
+final ProcessorSupplier processorSupplier = 
processorParameters.processorSupplier();
+if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
+throw new IllegalStateException("Unexpected processor type for 
table-table join: " + processorSupplier.getClass().getName());
+}
+final KTableKTableAbstractJoin tableJoin = 
(KTableKTableAbstractJoin) processorSupplier;
+
+if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {

Review Comment:
   Good question -- the answer is that this join node has multiple parents, 
while all the other `VersionedSemanticsGraphNode` implementations only have a 
single parent. Because this join node has multiple parents and writes two 
separate processors (one for each side of the join) to the topology, when 
enabling versioned semantics we need a way to distinguish which side of the 
join we're enabling versioned semantics for. In other words, it's possible that 
only one side of the join is "versioned," in which case one of the two join 
processors should have versioned semantics enabled while the other should not. 
The way that we determine which side of the join to enable versioned semantics 
for is based on the parent node name; the processor whose "joinThis" is the 
parent node which has been identified as versioned is the processor for which 
versioned semantics will be enabled. (In the case of a self-join, both 
processors will satisfy this check, and both processors will have versioned 
seman
 tics enabled.)
   
   For all other `VersionedSemanticsGraphNode` implementations which only have 
a single parent, we could also perform an analogous parent node name check if 
we wanted to, but the parent node name should always match so it'd be 
redundant. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170638371


##
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java:
##
@@ -87,7 +87,37 @@ public static Collection data() {
 
 StreamsBuilder builder;
 
-private final List> input = Arrays.asList(
+protected final List> input = Arrays.asList(
+new Input<>(INPUT_TOPIC_LEFT, null, 1),
+new Input<>(INPUT_TOPIC_RIGHT, null, 2),
+new Input<>(INPUT_TOPIC_LEFT, "A", 3),
+new Input<>(INPUT_TOPIC_RIGHT, "a", 4),
+new Input<>(INPUT_TOPIC_LEFT, "B", 5),
+new Input<>(INPUT_TOPIC_RIGHT, "b", 6),
+new Input<>(INPUT_TOPIC_LEFT, null, 7),
+new Input<>(INPUT_TOPIC_RIGHT, null, 8),
+new Input<>(INPUT_TOPIC_LEFT, "C", 9),
+new Input<>(INPUT_TOPIC_RIGHT, "c", 10),
+new Input<>(INPUT_TOPIC_RIGHT, null, 11),
+new Input<>(INPUT_TOPIC_LEFT, null, 12),
+new Input<>(INPUT_TOPIC_RIGHT, null, 13),
+new Input<>(INPUT_TOPIC_RIGHT, "d", 7), // out-of-order data with null 
as latest
+new Input<>(INPUT_TOPIC_LEFT, "D", 6),
+new Input<>(INPUT_TOPIC_LEFT, null, 2),
+new Input<>(INPUT_TOPIC_RIGHT, null, 3),
+new Input<>(INPUT_TOPIC_RIGHT, "e", 14),
+new Input<>(INPUT_TOPIC_LEFT, "E", 15),
+new Input<>(INPUT_TOPIC_LEFT, null, 10), // out-of-order data with 
non-null as latest
+new Input<>(INPUT_TOPIC_RIGHT, null, 9),
+new Input<>(INPUT_TOPIC_LEFT, "F", 4),
+new Input<>(INPUT_TOPIC_RIGHT, "f", 3)
+);
+
+// used for stream-stream join tests where out-of-order data does not 
meaningfully affect
+// the result, and the main `input` list results in too many result 
records/test noise.
+// also used for table-table multi-join tests, since out-of-order data 
with table-table
+// joins is already tested in non-multi-join settings.
+protected final List> inputWithoutOutOfOrderData = 
Arrays.asList(

Review Comment:
   As suggested in 
https://github.com/apache/kafka/pull/13497#discussion_r1163397015, I have split 
the input test data into two copies: a smaller one without out-of-order data 
(for use in stream-stream join and table-table multi-join tests) and a larger 
one with out-of-order data (for use elsewhere, including to validate versioned 
joins).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170633123


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -804,6 +806,7 @@ private  KTable doJoin(final KTable 
other,
 kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
 
 builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
+builder.addGraphNode(((KTableImpl) other).graphNode, 
kTableKTableJoinNode);

Review Comment:
   I don't know why it's currently the case that primary-key table-table join 
nodes only have one parent, instead of two. Seems more correct to have two, and 
the GraphNode mechanism for determining whether the joining table is versioned 
or not will not work without this parent connection.
   
   I have verified that there is no change to the built topology, so AFAICT 
this addition is internal-only.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   This, and a few other similar renames in comments, are unrelated to this PR 
but included as cleanup from https://github.com/apache/kafka/pull/13589.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org