[ https://issues.apache.org/jira/browse/KAFKA-6486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404345#comment-16404345 ]
ASF GitHub Bot commented on KAFKA-6486: --------------------------------------- mjsax closed pull request #4628: KAFKA-6486: Implemented LinkedHashMap in TimeWindows URL: https://github.com/apache/kafka/pull/4628 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index f9090c594cb..c2b910df5b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -19,7 +19,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.TimestampExtractor; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -105,7 +105,7 @@ public TimeWindows advanceBy(final long advanceMs) { @Override public Map<Long, TimeWindow> windowsFor(final long timestamp) { long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs; - final Map<Long, TimeWindow> windows = new HashMap<>(); + final Map<Long, TimeWindow> windows = new LinkedHashMap<>(); while (windowStart <= timestamp) { final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs); windows.put(windowStart, window); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java index 09ac173b737..f260bee62ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java @@ -16,8 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.TimeWindows; import org.junit.Test; +import java.util.Map; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -117,4 +121,15 @@ public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() { public void cannotCompareTimeWindowWithDifferentWindowType() { window.overlap(sessionWindow); } + + @Test + public void shouldReturnMatchedWindowsOrderedByTimestamp() { + final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L); + final Map<Long, TimeWindow> matched = windows.windowsFor(21L); + + final Long[] expected = matched.keySet().toArray(new Long[matched.size()]); + assertEquals(expected[0].longValue(), 10L); + assertEquals(expected[1].longValue(), 15L); + assertEquals(expected[2].longValue(), 20L); + } } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TimeWindows causes unordered calls to windowed aggregation functions > -------------------------------------------------------------------- > > Key: KAFKA-6486 > URL: https://issues.apache.org/jira/browse/KAFKA-6486 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.0 > Reporter: Valentino Proietti > Priority: Minor > Fix For: 1.2.0 > > Attachments: KAFKA-6486.patch > > > This is not a real bug but it causes some weird behaviour, at least in my > opinion. > The TimeWindows has a method called windowsFor() that uses and returns an > HashMap: > @Override > *public* Map<Long, TimeWindow> windowsFor(*final* *long* timestamp) { > *long* windowStart = (Math._max_(0, timestamp - sizeMs + advanceMs) / > advanceMs) * advanceMs; > *final* Map<Long, TimeWindow> windows = *new* HashMap<>(); > .... > the HashMap does not preserve the order of insertion and this ends up later > in calls to any streams windowed aggregation functions that are not ordered > by window time as I would expect. > A simple solution is to replace the HashMap with a LinkedHashMap and that's > what I did. > Anyway replacing it directly in your code can save hours of debugging to > understand what's happening. > Thank you > -- This message was sent by Atlassian JIRA (v7.6.3#76005)