cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472200360



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel 
recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+                                                           final String taskId,
+                                                           final String 
storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Do you have performance concerns due to the two monitors? Or what is the 
main reason for using a single monitor here? By using a single monitor here and 
in `addStoreLevelMutableMetric()` and `storeLevelSensor()`, we do not ensure 
that no metrics are added to the metrics map during removal of all metrics 
because each time  `Sensor#add()` is called a metric is added without 
synchronizing on the monitor of `storeLevelSensors`. Single operations on the 
metrics map are synchronized (through `ConcurrentMap`), but not multiple 
operations.
   
    

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -609,6 +611,37 @@ public void 
shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
         assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
     }
 
+    @Test
+    public void 
shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, "test-application", 
StreamsConfig.METRICS_LATEST, time);
+
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
+        EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.appConfigs())
+                .andStubReturn(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.replay(context);
+
+        rocksDBStore.init(context, rocksDBStore);
+        final byte[] key = "hello".getBytes();
+        final byte[] value = "world".getBytes();
+        rocksDBStore.put(Bytes.wrap(key), value);
+
+        final Metric numberOfEntriesActiveMemTable = metrics.metric(new 
MetricName(
+            "num-entries-active-mem-table",
+            StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+            "description is not verified",
+            streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), 
taskId.toString(), METRICS_SCOPE, DB_NAME)
+        ));
+        assertThat(numberOfEntriesActiveMemTable, notNullValue());
+        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), 
greaterThan(BigInteger.valueOf(0)));

Review comment:
       Yes, but in this test I merely test whether the metric is updated. The 
correctness of the computation is verified in 
`RocksDBMetricsRecorderGaugesTest`. I will improve this test to verify that the 
metric is zero before the put and greater than zero after the put.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel 
recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {

Review comment:
       I will do that to ensure that the `removeAllStoreLevelMetrics()` 
completes before we do the check. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to