lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466425932



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
                                                                                
     final boolean stateCreated,
                                                                                
     final StoreBuilder<?> storeBuilder,
                                                                                
     final Windows<W> windows,
+                                                                               
     final SlidingWindows slidingWindows,
                                                                                
     final SessionWindows sessionWindows,
                                                                                
     final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
-        if (windows == null && sessionWindows == null) {
+        if (windows == null && slidingWindows == null && sessionWindows == 
null) {
             kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-        } else if (windows != null && sessionWindows == null) {
+        } else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
             kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-        } else if (windows == null && sessionMerger != null) {
+        } else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+            kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+        } else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
       I'm happy to do a PR! Looking into it now though, 
`getStatefulProcessorNode` is called by `build`, so I think to really separate 
it by type we'd need a different `build` _and_ `StatefulProcessorNode`, 
otherwise we'd be moving the null checks into `build` and then calling the 
correct `getStatefulProcessorNode`, which does't seem to really fix anything. 
Thoughts? It's easy to create new `build` functions but I figured this might 
fall under not avoiding code duplication :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to