ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465369865
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final Windows<W> windows,
+
final SlidingWindows slidingWindows,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
final ProcessorSupplier<K, ?> kStreamAggregate;
- if (windows == null && sessionWindows == null) {
+ if (windows == null && slidingWindows == null && sessionWindows ==
null) {
kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(),
initializer, aggregator);
- } else if (windows != null && sessionWindows == null) {
+ } else if (windows != null && slidingWindows == null && sessionWindows
== null) {
kStreamAggregate = new KStreamWindowAggregate<>(windows,
storeBuilder.name(), initializer, aggregator);
- } else if (windows == null && sessionMerger != null) {
+ } else if (windows == null && slidingWindows != null && sessionWindows
== null) {
+ kStreamAggregate = new
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(),
initializer, aggregator);
+ } else if (windows == null && slidingWindows == null && sessionMerger
!= null) {
Review comment:
@mjsax why do we have a single method that accepts all three window
types and then checks them all individually to enforce that only one type of
window is actually "set"? Seems like we could enforce this implicitly by having
a separate method for time, session, and non-windowed aggregates and then just
calling the correct signature. ie `SessionWindowedCogroupedKStreamImpl` calls
`build(...SessionWindows, SessionMerger) and so on.
Maybe I'm missing something here because I wasn't following the cogroup KIP
that closely, but is this even exposed to the user in any way? My understanding
is that there's no way for this check to be violated by any kind of user input,
because this method is only ever called directly by Streams internal code with
`null` hardcoded for the unused window types. I think it's more of an internal
consistency check for Streams than an input validation for the user (and it
seems unnecessary: see above)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]