cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r693747330
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java ########## @@ -387,4 +393,51 @@ public void shouldGetCloseTaskSensor() { assertThat(sensor, is(expectedSensor)); } + + @Test + public void shouldAddThreadStartTimeMetric() { + // When: + ThreadMetrics.addThreadStartTimeMetric( + "bongo", + streamsMetrics, + 123L + ); + + // Then: + verify(streamsMetrics).addThreadLevelImmutableMetric( + "thread-start-time", + "The time that the thread was started", + "bongo", + 123L + ); + } + + @Test + public void shouldAddTotalBlockedTimeMetric() { + // Given: + final StreamThreadTotalBlockedTime blockedTime = mock(StreamThreadTotalBlockedTime.class); + when(blockedTime.compute()).thenReturn(123.45); Review comment: Could you please use a variable with a meaningful name like `startTime` instead of a literal and re-use the variable in the verification? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java ########## @@ -387,4 +393,51 @@ public void shouldGetCloseTaskSensor() { assertThat(sensor, is(expectedSensor)); } + + @Test + public void shouldAddThreadStartTimeMetric() { + // When: + ThreadMetrics.addThreadStartTimeMetric( + "bongo", + streamsMetrics, + 123L Review comment: Could you please use a variable with a meaningful name like `startTime` instead of a literal and re-use the variable in the verification? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java ########## @@ -1209,4 +1211,46 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() { verify(sensor); } + + @Test + public void shouldAddThreadLevelMutableMetric() { + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + + streamsMetrics.addThreadLevelMutableMetric( + "foobar", + "test metric", + "t1", + (c, t) -> 123 + ); + + final MetricName name = metrics.metricName( + "foobar", + THREAD_LEVEL_GROUP, + Collections.singletonMap("thread-id", "t1") + ); + assertThat(metrics.metric(name), notNullValue()); + assertThat(metrics.metric(name).metricValue(), equalTo(123)); + } + + @Test + public void shouldCleanupThreadLevelMutableMetric() { + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + streamsMetrics.addThreadLevelMutableMetric( + "foobar", + "test metric", + "t1", + (c, t) -> 123 Review comment: Could you please use variables with a meaningful name instead of a literal and re-use the variable in the verification? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java ########## @@ -117,6 +120,15 @@ public void shouldNoOpCloseTaskProducerIfEosDisabled() { assertThat(mockClientSupplier.producers.get(0).closed(), is(false)); } + @Test + public void shouldReturnBlockedTimeWhenThreadProducer() { + createTasks(); + final MockProducer<?, ?> producer = mockClientSupplier.producers.get(0); + addMetric(producer, "flush-time-total", 123.0); Review comment: Could you please use a variable with a meaningful name like `blockedTime` instead of a literal and re-use the variable in the verification? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -67,22 +69,35 @@ private final Map<String, Object> eosV2ProducerConfigs; private final KafkaClientSupplier clientSupplier; private final StreamThread.ProcessingMode processingMode; + private final Time time; private Producer<byte[], byte[]> producer; private boolean transactionInFlight = false; private boolean transactionInitialized = false; + private double oldProducerTotalBlockedTime = 0; public StreamsProducer(final StreamsConfig config, final String threadId, final KafkaClientSupplier clientSupplier, final TaskId taskId, final UUID processId, final LogContext logContext) { + this(config, threadId, clientSupplier, taskId, processId, logContext, Time.SYSTEM); + } Review comment: I think, we should not keep this constructor. It seems to me that we risk to have different time objects for thread/tasks and their producers which has the potential to lead to inconsistent time between these components. If the removal of the constructor makes this PR too large (and I suspect it will), I recommend to make a separate refactoring for this constructor change and get that merged before this PR. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java ########## @@ -1209,4 +1211,46 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() { verify(sensor); } + + @Test + public void shouldAddThreadLevelMutableMetric() { + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + + streamsMetrics.addThreadLevelMutableMetric( + "foobar", + "test metric", + "t1", + (c, t) -> 123 Review comment: Could you please use variables with a meaningful name instead of a literal and re-use the variable in the verification? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java ########## @@ -1121,4 +1130,65 @@ public void shouldResetTransactionInitializedOnResetProducer() { verify(mockedProducer); } + @Test + public void shouldComputeTotalBlockedTime() { + setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7); Review comment: Could you please use variables with a meaningful names instead of literals and re-use the variables in the verification? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java ########## @@ -1121,4 +1130,65 @@ public void shouldResetTransactionInitializedOnResetProducer() { verify(mockedProducer); } + @Test + public void shouldComputeTotalBlockedTime() { + setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7); + + final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7; + assertThat(nonEosStreamsProducer.totalBlockedTime(), closeTo(expectedTotalBlocked, 0.01)); + } + + @Test + public void shouldComputeTotalBlockedTimeAfterReset() { + setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7); + final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7; Review comment: Could you please use variables with a meaningful names instead of literals and re-use the variables in the verification? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org