guozhangwang commented on code in PR #18195:
URL: https://github.com/apache/kafka/pull/18195#discussion_r1900245185
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1235,26 +1241,24 @@ private <VR, KO, VO> KTable<K, VR>
doJoinOnForeignKey(final KTable<KO, VO> forei
final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
"-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
- final StatefulProcessorNode<KO, SubscriptionWrapper<K>>
subscriptionReceiveNode =
- new StatefulProcessorNode<>(
+ final ProcessorGraphNode<KO, SubscriptionWrapper<K>>
subscriptionReceiveNode =
+ new ProcessorGraphNode<>(
subscriptionReceiveName,
new ProcessorParameters<>(
new
SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory,
combinedKeySchema),
- subscriptionReceiveName),
- new String[]{subscriptionStoreName}
+ subscriptionReceiveName)
);
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter =
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
- final StatefulProcessorNode<CombinedKey<KO, K>,
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
- new StatefulProcessorNode<>(
+ final ProcessorToStateConnectorNode<CombinedKey<KO, K>,
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
Review Comment:
Originally I thought I do not need to use the newly introduced
`ProcessorToStateConnectorNode` for this case but only needed for those
`process/transform` with a store list, but thinking that again we probably do
not have another way around at the moment..
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java:
##########
@@ -97,11 +97,10 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal
functionName,
parentNode = repartitionNode;
}
- final StatefulProcessorNode<K, V> statefulProcessorNode =
- new StatefulProcessorNode<>(
+ final ProcessorGraphNode<K, V> statefulProcessorNode =
+ new ProcessorGraphNode<>(
aggFunctionName,
- new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
- new String[] {storeFactory.storeName()}
Review Comment:
This is for line 57 above and independent from the PR: just a thought, could
we pass in `storeFactory.storeName()` in the future?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java:
##########
@@ -0,0 +1,74 @@
+/*
Review Comment:
Should we have a unit test class for this new node?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1225,7 +1225,8 @@ public <KOut, VOut> KStream<KOut, VOut> process(
}
final String name = new NamedInternal(named).name();
- final StatefulProcessorNode<? super K, ? super V> processNode = new
StatefulProcessorNode<>(
+ final ProcessorToStateConnectorNode<? super K, ? super V> processNode
= new
+ ProcessorToStateConnectorNode<>(
Review Comment:
nit: is newline necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]