cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r693747330



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
##########
@@ -387,4 +393,51 @@ public void shouldGetCloseTaskSensor() {
 
         assertThat(sensor, is(expectedSensor));
     }
+
+    @Test
+    public void shouldAddThreadStartTimeMetric() {
+        // When:
+        ThreadMetrics.addThreadStartTimeMetric(
+            "bongo",
+            streamsMetrics,
+            123L
+        );
+
+        // Then:
+        verify(streamsMetrics).addThreadLevelImmutableMetric(
+            "thread-start-time",
+            "The time that the thread was started",
+            "bongo",
+            123L
+        );
+    }
+
+    @Test
+    public void shouldAddTotalBlockedTimeMetric() {
+        // Given:
+        final StreamThreadTotalBlockedTime blockedTime = 
mock(StreamThreadTotalBlockedTime.class);
+        when(blockedTime.compute()).thenReturn(123.45);

Review comment:
       Could you please use a variable with a meaningful name like `startTime` 
instead of a literal and re-use the variable in the verification?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
##########
@@ -387,4 +393,51 @@ public void shouldGetCloseTaskSensor() {
 
         assertThat(sensor, is(expectedSensor));
     }
+
+    @Test
+    public void shouldAddThreadStartTimeMetric() {
+        // When:
+        ThreadMetrics.addThreadStartTimeMetric(
+            "bongo",
+            streamsMetrics,
+            123L

Review comment:
       Could you please use a variable with a meaningful name like `startTime` 
instead of a literal and re-use the variable in the verification?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -1209,4 +1211,46 @@ public void 
shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
 
         verify(sensor);
     }
+
+    @Test
+    public void shouldAddThreadLevelMutableMetric() {
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> 123
+        );
+
+        final MetricName name = metrics.metricName(
+            "foobar",
+            THREAD_LEVEL_GROUP,
+            Collections.singletonMap("thread-id", "t1")
+        );
+        assertThat(metrics.metric(name), notNullValue());
+        assertThat(metrics.metric(name).metricValue(), equalTo(123));
+    }
+
+    @Test
+    public void shouldCleanupThreadLevelMutableMetric() {
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> 123

Review comment:
       Could you please use variables with a meaningful name instead of a 
literal and re-use the variable in the verification?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
##########
@@ -117,6 +120,15 @@ public void shouldNoOpCloseTaskProducerIfEosDisabled() {
         assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
     }
 
+    @Test
+    public void shouldReturnBlockedTimeWhenThreadProducer() {
+        createTasks();
+        final MockProducer<?, ?> producer = 
mockClientSupplier.producers.get(0);
+        addMetric(producer, "flush-time-total", 123.0);

Review comment:
       Could you please use a variable with a meaningful name like 
`blockedTime` instead of a literal and re-use the variable in the verification?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -67,22 +69,35 @@
     private final Map<String, Object> eosV2ProducerConfigs;
     private final KafkaClientSupplier clientSupplier;
     private final StreamThread.ProcessingMode processingMode;
+    private final Time time;
 
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private double oldProducerTotalBlockedTime = 0;
 
     public StreamsProducer(final StreamsConfig config,
                            final String threadId,
                            final KafkaClientSupplier clientSupplier,
                            final TaskId taskId,
                            final UUID processId,
                            final LogContext logContext) {
+        this(config, threadId, clientSupplier, taskId, processId, logContext, 
Time.SYSTEM);
+    }

Review comment:
       I think, we should not keep this constructor. It seems to me that we 
risk to have different time objects for thread/tasks and their producers which 
has the potential to lead to inconsistent time between these components. If the 
removal of the constructor makes this PR too large (and I suspect it will), I 
recommend to make a separate refactoring for this constructor change and get 
that merged before this PR. 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -1209,4 +1211,46 @@ public void 
shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
 
         verify(sensor);
     }
+
+    @Test
+    public void shouldAddThreadLevelMutableMetric() {
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+        streamsMetrics.addThreadLevelMutableMetric(
+            "foobar",
+            "test metric",
+            "t1",
+            (c, t) -> 123

Review comment:
       Could you please use variables with a meaningful name instead of a 
literal and re-use the variable in the verification?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1130,65 @@ public void 
shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);

Review comment:
       Could you please use variables with a meaningful names instead of 
literals and re-use the variables in the verification?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
##########
@@ -1121,4 +1130,65 @@ public void 
shouldResetTransactionInitializedOnResetProducer() {
         verify(mockedProducer);
     }
 
+    @Test
+    public void shouldComputeTotalBlockedTime() {
+        setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7);
+
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;
+        assertThat(nonEosStreamsProducer.totalBlockedTime(), 
closeTo(expectedTotalBlocked, 0.01));
+    }
+
+    @Test
+    public void shouldComputeTotalBlockedTimeAfterReset() {
+        setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7);
+        final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7;

Review comment:
       Could you please use variables with a meaningful names instead of 
literals and re-use the variables in the verification?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to