This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new c6acaaa KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671) c6acaaa is described below commit c6acaaa469b20c385a7b5dbdcbb7a810a24881ab Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Sat May 4 06:26:10 2019 -0500 KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671) Session windows expired prematurely (off-by-one error), since the window end is inclusive, unlike other windows Suppress duration for sessions incorrectly waited only the grace period, but session windows aren't closed until gracePeriod + sessionGap cherry-pick of 6654 from trunk Reviewers: Bill Bejeck <b...@confluent.io> --- .../internals/KStreamSessionWindowAggregate.java | 28 +++---- .../internals/graph/GraphGraceSearchUtil.java | 2 +- ...KStreamSessionWindowAggregateProcessorTest.java | 96 ++++++++++++++++++++-- .../kstream/internals/SuppressScenarioTest.java | 33 ++++---- .../internals/graph/GraphGraceSearchUtilTest.java | 10 +-- 5 files changed, 125 insertions(+), 44 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 707ad91..368e2c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -134,19 +134,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce } } - if (mergedWindow.end() > closeTime) { - if (!mergedWindow.equals(newSessionWindow)) { - for (final KeyValue<Windowed<K>, Agg> session : merged) { - store.remove(session.key); - tupleForwarder.maybeForward(session.key, null, session.value); - } - } - - agg = aggregator.apply(key, value, agg); - final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow); - store.put(sessionKey, agg); - tupleForwarder.maybeForward(sessionKey, agg, null); - } else { + if (mergedWindow.end() < closeTime) { LOG.debug( "Skipping record for expired window. " + "key=[{}] " + @@ -154,7 +142,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce "partition=[{}] " + "offset=[{}] " + "timestamp=[{}] " + - "window=[{},{}) " + + "window=[{},{}] " + "expiration=[{}] " + "streamTime=[{}]", key, @@ -168,6 +156,18 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce observedStreamTime ); lateRecordDropSensor.record(); + } else { + if (!mergedWindow.equals(newSessionWindow)) { + for (final KeyValue<Windowed<K>, Agg> session : merged) { + store.remove(session.key); + tupleForwarder.maybeForward(session.key, null, session.value); + } + } + + agg = aggregator.apply(key, value, agg); + final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow); + store.put(sessionKey, agg); + tupleForwarder.maybeForward(sessionKey, agg, null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java index 306ddf5..2fb28dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -78,7 +78,7 @@ public final class GraphGraceSearchUtil { } else if (processorSupplier instanceof KStreamSessionWindowAggregate) { final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier; final SessionWindows windows = kStreamSessionWindowAggregate.windows(); - return windows.gracePeriodMs(); + return windows.gracePeriodMs() + windows.inactivityGap(); } else { return null; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index c93dc60..bf61fd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -320,11 +320,11 @@ public class KStreamSessionWindowAggregateProcessorTest { } @Test - public void shouldLogAndMeterWhenSkippingLateRecord() { + public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() { LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); final Processor<String, String> processor = new KStreamSessionWindowAggregate<>( - SessionWindows.with(ofMillis(10L)).grace(ofMillis(10L)), + SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)), STORE_NAME, initializer, aggregator, @@ -334,14 +334,21 @@ public class KStreamSessionWindowAggregateProcessorTest { initStore(false); processor.init(context); - // dummy record to advance stream time - context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null)); + // dummy record to establish stream time = 0 + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); processor.process("dummy", "dummy"); + // record arrives on time, should not be skipped context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); - processor.process("A", "1"); + processor.process("OnTime1", "1"); + + // dummy record to advance stream time = 1 context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); - processor.process("A", "1"); + processor.process("dummy", "dummy"); + + // record is late + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("Late1", "1"); LogCaptureAppender.unregister(appender); final MetricName dropMetric = new MetricName( @@ -355,7 +362,7 @@ public class KStreamSessionWindowAggregateProcessorTest { ) ); - assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0)); + assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0)); final MetricName dropRate = new MetricName( "late-record-drop-rate", @@ -373,9 +380,80 @@ public class KStreamSessionWindowAggregateProcessorTest { greaterThan(0.0)); assertThat( appender.getMessages(), - hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]")); + hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]")); + } + + @Test + public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() { + LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + final Processor<String, String> processor = new KStreamSessionWindowAggregate<>( + SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)), + STORE_NAME, + initializer, + aggregator, + sessionMerger + ).get(); + + initStore(false); + processor.init(context); + + // dummy record to establish stream time = 0 + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("dummy", "dummy"); + + // record arrives on time, should not be skipped + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("OnTime1", "1"); + + // dummy record to advance stream time = 1 + context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); + processor.process("dummy", "dummy"); + + // delayed record arrives on time, should not be skipped + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("OnTime2", "1"); + + // dummy record to advance stream time = 2 + context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null)); + processor.process("dummy", "dummy"); + + // delayed record arrives late + context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); + processor.process("Late1", "1"); + + + LogCaptureAppender.unregister(appender); + + final MetricName dropMetric = new MetricName( + "late-record-drop-total", + "stream-processor-node-metrics", + "The total number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "test"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "TESTING_NODE") + ) + ); + + assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0)); + + final MetricName dropRate = new MetricName( + "late-record-drop-rate", + "stream-processor-node-metrics", + "The average number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "test"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "TESTING_NODE") + ) + ); + + assertThat( + (Double) metrics.metrics().get(dropRate).metricValue(), + greaterThan(0.0)); assertThat( appender.getMessages(), - hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]")); + hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 65c51fc..361677d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -46,7 +46,6 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.junit.Test; -import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -310,7 +309,7 @@ public class SuppressScenarioTest { .count(); valueCounts // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. - .suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) + .suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) .toStream() .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); valueCounts @@ -481,7 +480,7 @@ public class SuppressScenarioTest { final KTable<Windowed<String>, Long> valueCounts = builder .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE)) - .windowedBy(SessionWindows.with(5L).grace(ofMillis(5L))) + .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L))) .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled()); valueCounts .suppress(untilWindowCloses(unbounded())) @@ -502,34 +501,38 @@ public class SuppressScenarioTest { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { // first window driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L)); + // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); - // new window - driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L)); + // any record in the same partition advances stream time (note the key is different) + driver.pipeInput(recordFactory.create("input", "k2", "v1", 6L)); // late event for first window - this should get dropped from all streams, since the first window is now closed. - driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L)); // just pushing stream time forward to flush the other events through. driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L)); verify( drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), asList( new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L), - new KeyValueTimestamp<>("[k1@0/0]", null, 1L), - new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L), - new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L), + new KeyValueTimestamp<>("[k1@0/0]", null, 5L), + new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L), + new KeyValueTimestamp<>("[k1@0/5]", null, 1L), + new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L), + new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L), new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L) ) ); verify( drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), asList( - new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L), - new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L) + new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L), + new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L) ) ); } } - private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) { + private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) { if (results.size() != expectedResults.size()) { throw new AssertionError(printRecords(results) + " != " + expectedResults); } @@ -544,7 +547,7 @@ public class SuppressScenarioTest { } } - private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { + private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { final List<ProducerRecord<K, V>> result = new LinkedList<>(); for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer); next != null; @@ -554,11 +557,11 @@ public class SuppressScenarioTest { return new ArrayList<>(result); } - private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) { + private static <K, V> String printRecords(final List<ProducerRecord<K, V>> result) { final StringBuilder resultStr = new StringBuilder(); resultStr.append("[\n"); for (final ProducerRecord<?, ?> record : result) { - resultStr.append(" ").append(record.toString()).append("\n"); + resultStr.append(" ").append(record).append("\n"); } resultStr.append("]"); return resultStr.toString(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 5e426d9..45fe845 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -121,11 +121,11 @@ public class GraphGraceSearchUtilTest { ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs())); + assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); } @Test - public void shouldExtractGraceFromAncestorThroughStatefulParent() { + public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() { final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L)); final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( "asdf", @@ -160,11 +160,11 @@ public class GraphGraceSearchUtilTest { statefulParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs())); + assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); } @Test - public void shouldExtractGraceFromAncestorThroughStatelessParent() { + public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L)); final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( "asdf", @@ -189,7 +189,7 @@ public class GraphGraceSearchUtilTest { statelessParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs())); + assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); } @Test