guozhangwang commented on code in PR #18195:
URL: https://github.com/apache/kafka/pull/18195#discussion_r1900245185


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1235,26 +1241,24 @@ private <VR, KO, VO> KTable<K, VR> 
doJoinOnForeignKey(final KTable<KO, VO> forei
 
         final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
             "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
-        final StatefulProcessorNode<KO, SubscriptionWrapper<K>> 
subscriptionReceiveNode =
-            new StatefulProcessorNode<>(
+        final ProcessorGraphNode<KO, SubscriptionWrapper<K>> 
subscriptionReceiveNode =
+            new ProcessorGraphNode<>(
                 subscriptionReceiveName,
                 new ProcessorParameters<>(
                     new 
SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, 
combinedKeySchema),
-                    subscriptionReceiveName),
-                new String[]{subscriptionStoreName}
+                    subscriptionReceiveName)
             );
         builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
 
         final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = 
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
-        final StatefulProcessorNode<CombinedKey<KO, K>, 
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
-            new StatefulProcessorNode<>(
+        final ProcessorToStateConnectorNode<CombinedKey<KO, K>, 
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =

Review Comment:
   Originally I thought I do not need to use the newly introduced 
`ProcessorToStateConnectorNode` for this case but only needed for those 
`process/transform` with a store list, but thinking that again we probably do 
not have another way around at the moment..



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java:
##########
@@ -97,11 +97,10 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal 
functionName,
             parentNode = repartitionNode;
         }
 
-        final StatefulProcessorNode<K, V> statefulProcessorNode =
-            new StatefulProcessorNode<>(
+        final ProcessorGraphNode<K, V> statefulProcessorNode =
+            new ProcessorGraphNode<>(
                 aggFunctionName,
-                new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
-                new String[] {storeFactory.storeName()}

Review Comment:
   This is for line 57 above and independent from the PR: just a thought, could 
we pass in `storeFactory.storeName()` in the future?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java:
##########
@@ -0,0 +1,74 @@
+/*

Review Comment:
   Should we have a unit test class for this new node?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1225,7 +1225,8 @@ public <KOut, VOut> KStream<KOut, VOut> process(
         }
 
         final String name = new NamedInternal(named).name();
-        final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
+        final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new
+            ProcessorToStateConnectorNode<>(

Review Comment:
   nit: is newline necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to