nicktelford commented on code in PR #16041: URL: https://github.com/apache/kafka/pull/16041#discussion_r1617008040
########## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java: ########## @@ -490,6 +490,37 @@ public void shouldTimeIteratorDuration() { assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1))); } + @Test + public void shouldTrackOldestOpenIteratorTimestamp() { + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, not(nullValue())); + + assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue()); + + final KeyValueIterator<String, String> second; + final long secondTimestamp; + try (final KeyValueIterator<String, String> first = metered.all()) { + final long oldestTimestamp = mockTime.milliseconds(); + assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); + mockTime.sleep(100); + + // open a second iterator before closing the first to test that we still produce the first iterator's timestamp + second = metered.all(); + secondTimestamp = mockTime.milliseconds(); + assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); + mockTime.sleep(100); + } + + // now that the first iterator is closed, check that the timestamp has advanced to the still open second iterator + assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(secondTimestamp)); + second.close(); Review Comment: Done. I assumed you were suggesting this in all the tests, so I updated them all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org