cadonna commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472200360
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } - public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { + public <T> void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge<T> valueProvider) { + final MetricName metricName = metrics.metricName( + name, + STATE_STORE_LEVEL_GROUP, + description, + storeLevelTagMap(threadId, taskId, metricsScope, storeName) + ); + if (metrics.metric(metricName) == null) { + final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); + final String key = storeSensorPrefix(threadId, taskId, storeName); + synchronized (storeLevelMetrics) { + metrics.addMetric(metricName, metricConfig, valueProvider); + storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); + } + } + } + + public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { + removeAllStoreLevelMetrics(threadId, taskId, storeName); + removeAllStoreLevelSensors(threadId, taskId, storeName); + } Review comment: Do you have performance concerns due to the two monitors? Or what is the main reason for using a single monitor here? By using a single monitor here and in `addStoreLevelMutableMetric()` and `storeLevelSensor()`, we do not ensure that no metrics are added to the metrics map during removal of all metrics because each time `Sensor#add()` is called a metric is added without synchronizing on the monitor of `storeLevelSensors`. Single operations on the metrics map are synchronized (through `ConcurrentMap`), but not multiple operations. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java ########## @@ -609,6 +611,37 @@ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() { assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d)); } + @Test + public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() { + final TaskId taskId = new TaskId(0, 0); + + final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time); + + context = EasyMock.niceMock(InternalMockProcessorContext.class); + EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics); + EasyMock.expect(context.taskId()).andStubReturn(taskId); + EasyMock.expect(context.appConfigs()) + .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); + EasyMock.expect(context.stateDir()).andStubReturn(dir); + EasyMock.replay(context); + + rocksDBStore.init(context, rocksDBStore); + final byte[] key = "hello".getBytes(); + final byte[] value = "world".getBytes(); + rocksDBStore.put(Bytes.wrap(key), value); + + final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName( + "num-entries-active-mem-table", + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, + "description is not verified", + streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME) + )); + assertThat(numberOfEntriesActiveMemTable, notNullValue()); + assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0))); Review comment: Yes, but in this test I merely test whether the metric is updated. The correctness of the computation is verified in `RocksDBMetricsRecorderGaugesTest`. I will improve this test to verify that the metric is zero before the put and greater than zero after the put. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } - public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { + public <T> void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge<T> valueProvider) { + final MetricName metricName = metrics.metricName( + name, + STATE_STORE_LEVEL_GROUP, + description, + storeLevelTagMap(threadId, taskId, metricsScope, storeName) + ); + if (metrics.metric(metricName) == null) { + final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); + final String key = storeSensorPrefix(threadId, taskId, storeName); + synchronized (storeLevelMetrics) { Review comment: I will do that to ensure that the `removeAllStoreLevelMetrics()` completes before we do the check. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org