[ 
https://issues.apache.org/jira/browse/KAFKA-6486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404345#comment-16404345
 ] 

ASF GitHub Bot commented on KAFKA-6486:
---------------------------------------

mjsax closed pull request #4628: KAFKA-6486: Implemented LinkedHashMap in 
TimeWindows
URL: https://github.com/apache/kafka/pull/4628
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index f9090c594cb..c2b910df5b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -105,7 +105,7 @@ public TimeWindows advanceBy(final long advanceMs) {
     @Override
     public Map<Long, TimeWindow> windowsFor(final long timestamp) {
         long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / 
advanceMs) * advanceMs;
-        final Map<Long, TimeWindow> windows = new HashMap<>();
+        final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
         while (windowStart <= timestamp) {
             final TimeWindow window = new TimeWindow(windowStart, windowStart 
+ sizeMs);
             windows.put(windowStart, window);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index 09ac173b737..f260bee62ef 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.junit.Test;
 
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -117,4 +121,15 @@ public void 
shouldNotOverlapIsOtherWindowIsAfterThisWindow() {
     public void cannotCompareTimeWindowWithDifferentWindowType() {
         window.overlap(sessionWindow);
     }
+
+    @Test
+    public void shouldReturnMatchedWindowsOrderedByTimestamp() {
+        final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
+        final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+
+        final Long[] expected = matched.keySet().toArray(new 
Long[matched.size()]);
+        assertEquals(expected[0].longValue(), 10L);
+        assertEquals(expected[1].longValue(), 15L);
+        assertEquals(expected[2].longValue(), 20L);
+    }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TimeWindows causes unordered calls to windowed aggregation functions
> --------------------------------------------------------------------
>
>                 Key: KAFKA-6486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6486
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Valentino Proietti
>            Priority: Minor
>             Fix For: 1.2.0
>
>         Attachments: KAFKA-6486.patch
>
>
> This is not a real bug but it causes some weird behaviour, at least in my 
> opinion.
> The TimeWindows has a method called windowsFor() that uses and returns an 
> HashMap:
>     @Override
>     *public* Map<Long, TimeWindow> windowsFor(*final* *long* timestamp) {
>         *long* windowStart = (Math._max_(0, timestamp - sizeMs + advanceMs) / 
> advanceMs) * advanceMs;
>         *final* Map<Long, TimeWindow> windows = *new* HashMap<>();
>         ....
> the HashMap does not preserve the order of insertion and this ends up later 
> in calls to any streams windowed aggregation functions that are not ordered 
> by window time as I would expect.
> A simple solution is to replace the HashMap with a LinkedHashMap and that's 
> what I did.
> Anyway replacing it directly in your code can save hours of debugging to 
> understand what's happening.
> Thank you 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to