mjsax commented on code in PR #16041:
URL: https://github.com/apache/kafka/pull/16041#discussion_r1612395523


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java:
##########
@@ -451,6 +455,23 @@ public static void addNumOpenIteratorsGauge(final String 
taskId,
 
     }
 
+    public static void addOldestOpenIteratorGauge(final String taskId,
+                                                  final String storeType,
+                                                  final String storeName,
+                                                  final StreamsMetricsImpl 
streamsMetrics,
+                                                  final Gauge<Long> 
oldestOpenIteratorGauge) {
+        streamsMetrics.addStoreLevelMutableMetric(
+                taskId,
+                storeType,
+                storeName,
+                OLDEST_ITERATOR_OPEN_SINCE_MS,
+                OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION,
+                RecordingLevel.INFO,
+                oldestOpenIteratorGauge
+        );
+

Review Comment:
   nit: remove blank line



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -169,6 +172,10 @@ private void registerMetrics() {
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.get());
+        StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
+                (config, now) -> openIterators.isEmpty() ? null :
+                    
openIterators.stream().mapToLong(MeteredIterator::startTimestamp).min().getAsLong()

Review Comment:
   I don't want to over-engineer (given that we can safely assume that the 
`openIterator` set should be small), but wondering if this is the best 
implementation?
   
   In the end, we only want to track the create ts, not the iterators 
themselves. And for create ts we could just maintain a list if longs, and we 
would `return list.first()` here, and always append to the end of the list when 
a new iterator is created? Only "remove" would be more expensive, but we could 
use a sorted tree for the list, and thus remove would be O(log n) not O(n)).
   
   For this case, we also don't need the `MeteredIterator` helper interface.
   
   Thoughts?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java:
##########
@@ -490,6 +490,31 @@ public void shouldTimeIteratorDuration() {
         assertThat((double) iteratorDurationMaxMetric.metricValue(), 
equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
     }
 
+    @Test
+    public void shouldTrackOldestOpenIteratorTimestamp() {
+        when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+        init();
+
+        final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
+        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+
+        try (final KeyValueIterator<String, String> first = metered.all()) {
+            final long oldestTimestamp = mockTime.milliseconds();
+            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
+            mockTime.sleep(100);
+
+            // open a second iterator before closing the first to test that we 
still produce the first iterator's timestamp
+            try (final KeyValueIterator<String, String> second = 
metered.all()) {
+                assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
+                mockTime.sleep(100);
+            }

Review Comment:
   It would be better to not close the second iterator here, but close the 
first one first, to see if the metric advances to the second's iterator create 
ts -- would need some rewriting of the test; try-with-resource won't allow for 
proper nesting, but we can still use try-finally.
   
   Might actually be best, to open like 5 iterators and close them in some 
non-linear order (including closing the oldest one like 2 or 3 times) to verify 
correct behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to