mjsax commented on code in PR #16041: URL: https://github.com/apache/kafka/pull/16041#discussion_r1612395523
########## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java: ########## @@ -451,6 +455,23 @@ public static void addNumOpenIteratorsGauge(final String taskId, } + public static void addOldestOpenIteratorGauge(final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics, + final Gauge<Long> oldestOpenIteratorGauge) { + streamsMetrics.addStoreLevelMutableMetric( + taskId, + storeType, + storeName, + OLDEST_ITERATOR_OPEN_SINCE_MS, + OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION, + RecordingLevel.INFO, + oldestOpenIteratorGauge + ); + Review Comment: nit: remove blank line ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ########## @@ -169,6 +172,10 @@ private void registerMetrics() { iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.get()); + StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> openIterators.isEmpty() ? null : + openIterators.stream().mapToLong(MeteredIterator::startTimestamp).min().getAsLong() Review Comment: I don't want to over-engineer (given that we can safely assume that the `openIterator` set should be small), but wondering if this is the best implementation? In the end, we only want to track the create ts, not the iterators themselves. And for create ts we could just maintain a list if longs, and we would `return list.first()` here, and always append to the end of the list when a new iterator is created? Only "remove" would be more expensive, but we could use a sorted tree for the list, and thus remove would be O(log n) not O(n)). For this case, we also don't need the `MeteredIterator` helper interface. Thoughts? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java: ########## @@ -490,6 +490,31 @@ public void shouldTimeIteratorDuration() { assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1))); } + @Test + public void shouldTrackOldestOpenIteratorTimestamp() { + when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); + init(); + + final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); + assertThat(oldestIteratorTimestampMetric, not(nullValue())); + + assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue()); + + try (final KeyValueIterator<String, String> first = metered.all()) { + final long oldestTimestamp = mockTime.milliseconds(); + assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); + mockTime.sleep(100); + + // open a second iterator before closing the first to test that we still produce the first iterator's timestamp + try (final KeyValueIterator<String, String> second = metered.all()) { + assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); + mockTime.sleep(100); + } Review Comment: It would be better to not close the second iterator here, but close the first one first, to see if the metric advances to the second's iterator create ts -- would need some rewriting of the test; try-with-resource won't allow for proper nesting, but we can still use try-finally. Might actually be best, to open like 5 iterators and close them in some non-linear order (including closing the oldest one like 2 or 3 times) to verify correct behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org