ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466063583



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##########
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
         )));
     }
 
+
+
+    @Test
+    public void shouldReduceSlidingWindows() throws Exception {
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+        final long firstBatchTimestamp = 2000L;
+        final long timeDifference = 1000L;
+        produceMessages(firstBatchTimestamp);
+        final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+        produceMessages(secondBatchTimestamp);
+        final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+        produceMessages(thirdBatchTimestamp);
+
+        final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        groupedStream
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+                .reduce(reducer)
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+        startStreams();
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput 
= receiveMessages(
+                new TimeWindowedDeserializer<>(),
+                new StringDeserializer(),
+                String.class,
+                25);
+
+        // read from ConsoleConsumer
+        final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new StringDeserializer(),
+                String.class,
+                25,
+                true);
+
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing(KeyValueTimestamp::value);
+
+        windowedOutput.sort(comparator);
+        final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+        final long firstBatchRightWindow = firstBatchTimestamp + 1;
+        final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+        final long secondBatchRightWindow = secondBatchTimestamp + 1;
+        final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = 
Arrays.asList(
+                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
       No she's right, this problem is not resolved at all. You can pass in 
`windowSize` to the constructor for `TimeWindowedDeserializer` all you want but 
it just gets ignored because the actual deserializer object you instantiate is 
thrown away. Whether you're reading in records through a Java Consumer or the 
console consumer (for some reason this test does both), the actual deserializer 
is always constructed within the consumer based on the configs. There's a 
config for the windowed inner class which is properly set in 
`TimeWindowedDeserializer#configure` but no config for the `windowSize` so 
there's no way to set it at the moment.
   
   tl;dr there's no point in having serde constructors accept parameters, they 
need to be set through `configure`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to