This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ab17937abcd6aee6192679baf7a0b4f1e0a15fe1 Author: Matthias J. Sax <[email protected]> AuthorDate: Thu Feb 11 16:49:05 2021 -0800 KAFKA-12272: Fix commit-interval metrics (#10102) Reviewer: A. Sophie Blee-Goldman <[email protected]> --- .../streams/processor/internals/StreamThread.java | 5 +- .../processor/internals/StreamThreadTest.java | 116 ++++++++++++++++++++- 2 files changed, 118 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3ea40d5..3dc0c02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -776,9 +776,10 @@ public class StreamThread extends Thread { log.debug("{} punctuators ran.", punctuated); + final long beforeCommitMs = now; final int committed = maybeCommit(); totalCommittedSinceLastSummary += committed; - final long commitLatency = advanceNowAndComputeLatency(); + final long commitLatency = Math.max(now - beforeCommitMs, 0); totalCommitLatency += commitLatency; if (committed > 0) { commitSensor.record(commitLatency / (double) committed, now); @@ -1016,7 +1017,7 @@ public class StreamThread extends Thread { if (committed == -1) { log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes."); } else { - advanceNowAndComputeLatency(); + now = time.milliseconds(); lastCommitMs = now; } } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2f95a20..e4d083b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -117,6 +117,7 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -836,6 +837,119 @@ public class StreamThreadTest { } @Test + public void shouldRecordCommitLatency() { + final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); + final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); + expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); + expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); + expect(consumer.poll(anyObject())).andStubReturn(new ConsumerRecords<>(Collections.emptyMap())); + final Task task = niceMock(Task.class); + expect(task.id()).andStubReturn(task1); + expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1)); + final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class); + expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.singleton(task)); + expect(activeTaskCreator.producerClientIds()).andStubReturn(Collections.singleton("producerClientId")); + EasyMock.replay(consumer, consumerGroupMetadata, task, activeTaskCreator); + + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + + final TaskManager taskManager = new TaskManager( + null, + null, + null, + null, + null, + activeTaskCreator, + null, + internalTopologyBuilder, + null, + null, + null + ) { + @Override + int commit(final Collection<Task> tasksToCommit) { + mockTime.sleep(10L); + return 1; + } + }; + taskManager.setMainConsumer(consumer); + + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + changelogReader, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger(), + new AtomicLong(Long.MAX_VALUE), + null, + HANDLER, + null + ); + thread.updateThreadMetadata("adminClientId"); + thread.setState(StreamThread.State.STARTING); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(task1, Collections.singleton(t1p1)); + thread.taskManager().handleAssignment(activeTasks, emptyMap()); + thread.rebalanceListener().onPartitionsAssigned(Collections.singleton(t1p1)); + + assertTrue( + Double.isNaN( + (Double) streamsMetrics.metrics().get(new MetricName( + "commit-latency-max", + "stream-thread-metrics", + "", + Collections.singletonMap("thread-id", CLIENT_ID)) + ).metricValue() + ) + ); + assertTrue( + Double.isNaN( + (Double) streamsMetrics.metrics().get(new MetricName( + "commit-latency-avg", + "stream-thread-metrics", + "", + Collections.singletonMap("thread-id", CLIENT_ID)) + ).metricValue() + ) + ); + + thread.runOnce(); + + assertThat( + streamsMetrics.metrics().get( + new MetricName( + "commit-latency-max", + "stream-thread-metrics", + "", + Collections.singletonMap("thread-id", CLIENT_ID) + ) + ).metricValue(), + equalTo(10.0) + ); + assertThat( + streamsMetrics.metrics().get( + new MetricName( + "commit-latency-avg", + "stream-thread-metrics", + "", + Collections.singletonMap("thread-id", CLIENT_ID) + ) + ).metricValue(), + equalTo(10.0) + ); + } + + @Test public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalStreamsBuilder.buildAndOptimizeTopology(); @@ -2621,7 +2735,7 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalTopologyBuilder.addProcessor( "processor1", - (ProcessorSupplier<byte[], byte[], ?, ?>) () -> new MockApiProcessor<>(), + (ProcessorSupplier<byte[], byte[], ?, ?>) MockApiProcessor::new, "source1" ); }
