mjsax commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466032331
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final Windows<W> windows,
+
final SlidingWindows slidingWindows,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
final ProcessorSupplier<K, ?> kStreamAggregate;
- if (windows == null && sessionWindows == null) {
+ if (windows == null && slidingWindows == null && sessionWindows ==
null) {
kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(),
initializer, aggregator);
- } else if (windows != null && sessionWindows == null) {
+ } else if (windows != null && slidingWindows == null && sessionWindows
== null) {
kStreamAggregate = new KStreamWindowAggregate<>(windows,
storeBuilder.name(), initializer, aggregator);
- } else if (windows == null && sessionMerger != null) {
+ } else if (windows == null && slidingWindows != null && sessionWindows
== null) {
+ kStreamAggregate = new
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(),
initializer, aggregator);
+ } else if (windows == null && slidingWindows == null && sessionMerger
!= null) {
Review comment:
We pass in all parameter to sharing to code that creates the
`StatefulProcessorNode` -- not sure if it's the best way to structure the code
and I am happy to split it up into multiple methods call (as long as we avoid
code duplication). And yes, you are right, it's internal and the checks are
just for us to avoid programming errors. Users should never be exposed to it. I
personally tend to make a lot of mistakes and the more checks we have in place
the better IMHO :)
If @lct45 want's she can just do a side cleanup PR to fix it, and rebase
this PR after the cleanup PR was merged? Or we do it as follow up. Whatever
works best for you.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]