This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9c6c6bfa2bd KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166) 9c6c6bfa2bd is described below commit 9c6c6bfa2bdf97d546ebb7721bc19d15c955b746 Author: Qing <l.q.we...@gmail.com> AuthorDate: Wed Dec 28 20:32:54 2022 +0000 KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166) Reviewers: Matthias J. Sax <matth...@confluent.io>, Hao Li <h...@confluent.io> --- .../kstream/internals/KStreamKStreamJoin.java | 7 +- .../kstream/internals/KStreamKStreamJoinTest.java | 97 +++++++++++++++++++++- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 397a86d24ad..d1cfa25e191 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -205,9 +205,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) { return; } - if (sharedTimeTracker.nextTimeToEmit == 0) { - sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); - } + + // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime, + // they can get out of sync during a clock drift + sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); sharedTimeTracker.advanceNextTimeToEmit(); // reset to MAX_VALUE in case the store is empty diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 6864396d433..a5146a93333 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; @@ -31,17 +32,33 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde; +import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.MockInternalNewProcessorContext; import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Test; +import org.mockito.Mockito; import java.time.Duration; import java.time.Instant; @@ -49,8 +66,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static java.time.Duration.ofHours; import static java.time.Duration.ofMillis; @@ -333,6 +357,77 @@ public class KStreamKStreamJoinTest { runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows); } + @Test + public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() { + /** + * This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually. + */ + final KStreamImplJoin.TimeTracker tracker = new KStreamImplJoin.TimeTracker(); + final KStreamKStreamJoin<String, String, String, String> join = new KStreamKStreamJoin<>( + false, + "other", + new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))), + (key, v1, v2) -> v1 + v2, + true, + Optional.of("outer"), + tracker); + final Processor<String, String, String, String> joinProcessor = join.get(); + final MockInternalNewProcessorContext<String, String> procCtx = new MockInternalNewProcessorContext<>(); + final WindowStore<String, String> otherStore = new WindowStoreBuilder<>( + new InMemoryWindowBytesStoreSupplier( + "other", + 1000L, + 100, + false), + Serdes.String(), + Serdes.String(), + new MockTime()).build(); + + final KeyValueStore<TimestampedKeyAndJoinSide<String>, LeftOrRightValue<String, String>> outerStore = Mockito.spy( + new KeyValueStoreBuilder<>( + new InMemoryKeyValueBytesStoreSupplier("outer"), + new TimestampedKeyAndJoinSideSerde<>(Serdes.String()), + new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()), + new MockTime() + ).build()); + + final GenericInMemoryKeyValueStore<String, String> rootStore = new GenericInMemoryKeyValueStore<>("root"); + + otherStore.init((StateStoreContext) procCtx, rootStore); + procCtx.addStateStore(otherStore); + + outerStore.init((StateStoreContext) procCtx, rootStore); + procCtx.addStateStore(outerStore); + + joinProcessor.init(procCtx); + + final Record<String, String> record1 = new Record<>("key1", "value1", 10000L); + final Record<String, String> record2 = new Record<>("key2", "value2", 13000L); + final Record<String, String> record3 = new Record<>("key3", "value3", 15000L); + final Record<String, String> record4 = new Record<>("key4", "value4", 17000L); + + procCtx.setSystemTimeMs(1000L); + joinProcessor.process(record1); + + procCtx.setSystemTimeMs(2100L); + joinProcessor.process(record2); + + procCtx.setSystemTimeMs(2500L); + joinProcessor.process(record3); + // being throttled, so the older value still exists + assertEquals(2, iteratorToList(outerStore.all()).size()); + + procCtx.setSystemTimeMs(4000L); + joinProcessor.process(record4); + assertEquals(1, iteratorToList(outerStore.all()).size()); + } + + private <T> List<T> iteratorToList(final Iterator<T> iterator) { + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .collect(Collectors.toList()); + } + private void runJoin(final StreamJoined<String, Integer, Integer> streamJoined, final JoinWindows joinWindows) { final StreamsBuilder builder = new StreamsBuilder(); @@ -1808,7 +1903,7 @@ public class KStreamKStreamJoinTest { " <-- second-join-this-join, second-join-other-join\n" + " Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" + " <-- second-join-merge\n\n"; - + private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +