showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r682231711



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       @mjsax , answering your review comments here. Please check the above 
discussion for more info.
   
   > Why window-agnostic ? In general, I am not sure why we need to change the 
existing JavaDocs? What information do you think is missing or wrong?
   
   > You add (inclusive) and (exclusive) in SessionWindow but remove it here. 
Seems inconsistent?
   
   > Why do you remove this check? A TimeWindow should not allow this case.
   
   > Why do we need to remove this temporarily?
   
   --> The answer for the above questions are that we can't create a store 
supplier with window size of 0 here because we use `TimeWindow` to represent 
the "windowed KTable" result. @ableegoldman and I both thought it doesn't make 
sense to use `TimeWindow` to represent it if `WindowStore` is used for both 
inclusive-exclusive and also inclusive-inclusive windows. We should have a 
neutral time window for this case. That's why Sophie suggested that we should 
have a container class that does nothing but hold the start and end time for 
use in window-agnostic cases like the `CachingWindowStore`. And the container 
class can be named `TimeWindow`, and we were thinking that after all, we'll 
rename the `TimeWindow` into `InclusiveExclusiveWindow`, so that's why I 
changed the java doc/start and end time checking/test for it. 
   
   So, since we agreed that we won't rename the window, I'll revert it. But 
still, there's a question there:  
   _We can't create a store supplier with window size of 0 here because we use 
`TimeWindow` to represent the "windowed KTable" result._
   Do you agree we should have a container class to do nothing but hold the 
start and end time for use in window-agnostic cases like the 
`CachingWindowStore`? Or any other suggestions?
   
   Thank you.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to