ableegoldman commented on code in PR #18195:
URL: https://github.com/apache/kafka/pull/18195#discussion_r1900247691
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1235,26 +1241,24 @@ private <VR, KO, VO> KTable<K, VR>
doJoinOnForeignKey(final KTable<KO, VO> forei
final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
"-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
- final StatefulProcessorNode<KO, SubscriptionWrapper<K>>
subscriptionReceiveNode =
- new StatefulProcessorNode<>(
+ final ProcessorGraphNode<KO, SubscriptionWrapper<K>>
subscriptionReceiveNode =
+ new ProcessorGraphNode<>(
subscriptionReceiveName,
new ProcessorParameters<>(
new
SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory,
combinedKeySchema),
- subscriptionReceiveName),
- new String[]{subscriptionStoreName}
+ subscriptionReceiveName)
);
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter =
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
- final StatefulProcessorNode<CombinedKey<KO, K>,
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
- new StatefulProcessorNode<>(
+ final ProcessorToStateConnectorNode<CombinedKey<KO, K>,
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
Review Comment:
I had the same thought actually. I did take a quick look at it but
ultimately decided that whether or not it was possible, it would be too much
for this one PR. So I'm going to revisit this in a followup PR if it does
indeed make sense to do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]