[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170899662 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1098,7 +1101,7 @@ private KTable doJoinOnForeignKey(final KTable forei //not be done needlessly. ((KTableImpl) foreignKeyTable).enableSendingOldValues(true); -//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node. +//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node. Review Comment: Sure thing. Here ya go: https://github.com/apache/kafka/pull/13610 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1098,7 +1101,7 @@ private KTable doJoinOnForeignKey(final KTable forei //not be done needlessly. ((KTableImpl) foreignKeyTable).enableSendingOldValues(true); -//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node. +//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node. Review Comment: Sure thing: https://github.com/apache/kafka/pull/13610 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170899662 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1098,7 +1101,7 @@ private KTable doJoinOnForeignKey(final KTable forei //not be done needlessly. ((KTableImpl) foreignKeyTable).enableSendingOldValues(true); -//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node. +//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node. Review Comment: Here you go: https://github.com/apache/kafka/pull/13610 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170894940 ## streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java: ## @@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws Exception { null, null, null, -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), null, null, -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) +null, +null, +Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), +Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), +null, +null, +Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)) Review Comment: Yes, you are correct. I've fixed this and also strengthened the check that the test uses (in the latest commit) so that this type of "error" fails the test now. This required updating some of the table-table multi-join expected test results in the process, separate from versioned joins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170894062 ## streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java: ## @@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() { null, null, null, -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), +null, +null, +null, +null, +null, Review Comment: Yep, good eye. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170869917 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java: ## @@ -97,6 +95,27 @@ public KTableKTableJoinMerger joinMerger() { return (KTableKTableJoinMerger) kChangeProcessorSupplier; } +@Override +public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) { +enableVersionedSemantics(thisProcessorParameters(), useVersionedSemantics, parentNodeName); +enableVersionedSemantics(otherProcessorParameters(), useVersionedSemantics, parentNodeName); +} + +@SuppressWarnings("unchecked") +private void enableVersionedSemantics(final ProcessorParameters processorParameters, + final boolean useVersionedSemantics, + final String parentNodeName) { +final ProcessorSupplier processorSupplier = processorParameters.processorSupplier(); +if (!(processorSupplier instanceof KTableKTableAbstractJoin)) { +throw new IllegalStateException("Unexpected processor type for table-table join: " + processorSupplier.getClass().getName()); +} +final KTableKTableAbstractJoin tableJoin = (KTableKTableAbstractJoin) processorSupplier; + +if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) { Review Comment: Good question -- the answer is that this join node has multiple parents, while all the other `VersionedSemanticsGraphNode` implementations only have a single parent. Because this join node has multiple parents and writes two separate processors (one for each side of the join) to the topology, when enabling versioned semantics we need a way to distinguish which side of the join we're enabling versioned semantics for. In other words, it's possible that only one side of the join is "versioned," in which case one of the two join processors should have versioned semantics enabled while the other should not. The way that we determine which side of the join to enable versioned semantics for is based on the parent node name; the processor whose "joinThis" is the parent node which has been identified as versioned is the processor for which versioned semantics will be enabled. (In the case of a self-join, both processors will satisfy this check, and both processors will have versioned seman tics enabled.) For all other `VersionedSemanticsGraphNode` implementations which only have a single parent, we could also perform an analogous parent node name check if we wanted to, but the parent node name should always match so it'd be redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170638371 ## streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java: ## @@ -87,7 +87,37 @@ public static Collection data() { StreamsBuilder builder; -private final List> input = Arrays.asList( +protected final List> input = Arrays.asList( +new Input<>(INPUT_TOPIC_LEFT, null, 1), +new Input<>(INPUT_TOPIC_RIGHT, null, 2), +new Input<>(INPUT_TOPIC_LEFT, "A", 3), +new Input<>(INPUT_TOPIC_RIGHT, "a", 4), +new Input<>(INPUT_TOPIC_LEFT, "B", 5), +new Input<>(INPUT_TOPIC_RIGHT, "b", 6), +new Input<>(INPUT_TOPIC_LEFT, null, 7), +new Input<>(INPUT_TOPIC_RIGHT, null, 8), +new Input<>(INPUT_TOPIC_LEFT, "C", 9), +new Input<>(INPUT_TOPIC_RIGHT, "c", 10), +new Input<>(INPUT_TOPIC_RIGHT, null, 11), +new Input<>(INPUT_TOPIC_LEFT, null, 12), +new Input<>(INPUT_TOPIC_RIGHT, null, 13), +new Input<>(INPUT_TOPIC_RIGHT, "d", 7), // out-of-order data with null as latest +new Input<>(INPUT_TOPIC_LEFT, "D", 6), +new Input<>(INPUT_TOPIC_LEFT, null, 2), +new Input<>(INPUT_TOPIC_RIGHT, null, 3), +new Input<>(INPUT_TOPIC_RIGHT, "e", 14), +new Input<>(INPUT_TOPIC_LEFT, "E", 15), +new Input<>(INPUT_TOPIC_LEFT, null, 10), // out-of-order data with non-null as latest +new Input<>(INPUT_TOPIC_RIGHT, null, 9), +new Input<>(INPUT_TOPIC_LEFT, "F", 4), +new Input<>(INPUT_TOPIC_RIGHT, "f", 3) +); + +// used for stream-stream join tests where out-of-order data does not meaningfully affect +// the result, and the main `input` list results in too many result records/test noise. +// also used for table-table multi-join tests, since out-of-order data with table-table +// joins is already tested in non-multi-join settings. +protected final List> inputWithoutOutOfOrderData = Arrays.asList( Review Comment: As suggested in https://github.com/apache/kafka/pull/13497#discussion_r1163397015, I have split the input test data into two copies: a smaller one without out-of-order data (for use in stream-stream join and table-table multi-join tests) and a larger one with out-of-order data (for use elsewhere, including to validate versioned joins). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170633123 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -804,6 +806,7 @@ private KTable doJoin(final KTable other, kTableKTableJoinNode.setOutputVersioned(isOutputVersioned); builder.addGraphNode(this.graphNode, kTableKTableJoinNode); +builder.addGraphNode(((KTableImpl) other).graphNode, kTableKTableJoinNode); Review Comment: I don't know why it's currently the case that primary-key table-table join nodes only have one parent, instead of two. Seems more correct to have two, and the GraphNode mechanism for determining whether the joining table is versioned or not will not work without this parent connection. I have verified that there is no change to the built topology, so AFAICT this addition is internal-only. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1098,7 +1101,7 @@ private KTable doJoinOnForeignKey(final KTable forei //not be done needlessly. ((KTableImpl) foreignKeyTable).enableSendingOldValues(true); -//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node. +//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node. Review Comment: This, and a few other similar renames in comments, are unrelated to this PR but included as cleanup from https://github.com/apache/kafka/pull/13589. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org