This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27a3c752161 KAFKA-17488: Cleanup (test) code for Kafka Streams "metric 
version" (#17182)
27a3c752161 is described below

commit 27a3c752161d97db096dc0e8d905c7224b4aee2b
Author: Joao Pedro Fonseca Dantas <67479090+fonsd...@users.noreply.github.com>
AuthorDate: Thu Sep 26 21:04:01 2024 -0300

    KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" (#17182)
    
    This PR simply StreamsMetricsImpl to avoid passing in the unused "metric 
version" parameter.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  1 -
 .../internals/metrics/StreamsMetricsImpl.java      |  2 -
 ...KStreamSessionWindowAggregateProcessorTest.java |  2 +-
 .../processor/internals/ActiveTaskCreatorTest.java |  2 +-
 .../internals/GlobalStreamThreadTest.java          |  4 +-
 .../processor/internals/MockStreamsMetrics.java    |  3 +-
 .../processor/internals/ProcessorNodeTest.java     |  6 +-
 .../processor/internals/RecordQueueTest.java       |  3 +-
 .../processor/internals/SourceNodeTest.java        |  3 +-
 .../processor/internals/StandbyTaskTest.java       |  2 +-
 .../processor/internals/StreamTaskTest.java        | 44 ++++++------
 .../processor/internals/StreamThreadTest.java      | 40 +++++------
 .../internals/metrics/StreamsMetricsImplTest.java  | 82 +++++++++++-----------
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  8 +--
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  8 +--
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  2 +-
 .../internals/GlobalStateStoreProviderTest.java    |  2 +-
 .../state/internals/KeyValueSegmentTest.java       |  3 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |  3 +-
 .../state/internals/MeteredSessionStoreTest.java   |  3 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |  2 +-
 .../MeteredTimestampedWindowStoreTest.java         |  4 +-
 .../MeteredVersionedKeyValueStoreTest.java         |  3 +-
 .../state/internals/MeteredWindowStoreTest.java    |  2 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  6 +-
 .../RocksDBTimeOrderedKeyValueBufferTest.java      |  3 +-
 .../state/internals/TimestampedSegmentTest.java    |  3 +-
 .../metrics/RocksDBMetricsRecorderGaugesTest.java  |  7 +-
 .../metrics/RocksDBMetricsRecorderTest.java        |  3 +-
 .../kafka/test/InternalMockProcessorContext.java   | 10 ++-
 .../apache/kafka/streams/TopologyTestDriver.java   |  1 -
 .../streams/processor/MockProcessorContext.java    |  1 -
 .../processor/api/MockProcessorContext.java        |  1 -
 .../kafka/streams/MockProcessorContextTest.java    |  1 -
 34 files changed, 120 insertions(+), 150 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ff5d80e5556..089d0b28206 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1002,7 +1002,6 @@ public class KafkaStreams implements AutoCloseable {
         streamsMetrics = new StreamsMetricsImpl(
             metrics,
             clientId,
-            
applicationConfigs.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             time
         );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index e7a5c3202a0..f5660419b15 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -162,10 +162,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
     public StreamsMetricsImpl(final Metrics metrics,
                               final String clientId,
-                              final String builtInMetricsVersion,
                               final Time time) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
-        Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics 
version cannot be null");
         this.metrics = metrics;
         this.clientId = clientId;
         version = Version.LATEST;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 9d060a5882c..5d931e9fdf2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics();
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", time);
     private final String threadId = Thread.currentThread().getName();
     private final Initializer<Long> initializer = () -> 0L;
     private final Aggregator<String, String, Long> aggregator = (aggKey, 
value, aggregate) -> aggregate + 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 7bb7a9eb751..c1f58b3972d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
     private ChangelogReader changeLogReader;
 
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST, new 
MockTime());
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
     private final Map<String, Object> properties = mkMap(
         mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
         mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 73b610227a5..d7d2971db32 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -134,7 +134,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true, false),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, time),
+            new StreamsMetricsImpl(new Metrics(), "test-client", time),
             time,
             "clientId",
             stateRestoreListener,
@@ -173,7 +173,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true, false),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, time),
+            new StreamsMetricsImpl(new Metrics(), "test-client", time),
             time,
             "clientId",
             stateRestoreListener,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index bb0303c9493..4ed68ef8150 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -18,12 +18,11 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 public class MockStreamsMetrics extends StreamsMetricsImpl {
 
     public MockStreamsMetrics(final Metrics metrics) {
-        super(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
+        super(metrics, "test", new MockTime());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 7f4e2d08491..5eb5a39ddba 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -215,7 +215,7 @@ public class ProcessorNodeTest {
     public void testMetricsWithBuiltInMetricsVersionLatest() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>(NAME, new NoOpProcessor(), 
Collections.emptySet());
@@ -299,7 +299,7 @@ public class ProcessorNodeTest {
     public void testTopologyLevelClassCastExceptionDirect() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("pname", new ClassCastProcessor(), 
Collections.emptySet());
@@ -319,7 +319,7 @@ public class ProcessorNodeTest {
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class, 
withSettings().strictness(Strictness.LENIENT));
 
         when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
-        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, 
new MockTime()));
+        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
         when(internalProcessorContext.topic()).thenReturn(TOPIC);
         when(internalProcessorContext.partition()).thenReturn(PARTITION);
         when(internalProcessorContext.offset()).thenReturn(OFFSET);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index dd9efe581ff..076aeef7939 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -73,7 +72,7 @@ public class RecordQueueTest {
 
     private final Metrics metrics = new Metrics();
     private final StreamsMetricsImpl streamsMetrics =
-        new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, 
new MockTime());
+        new StreamsMetricsImpl(metrics, "mock", new MockTime());
 
     @SuppressWarnings("rawtypes")
     final InternalMockProcessorContext context = new 
InternalMockProcessorContext<>(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index a242aa8ccfc..90d88359042 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.SensorAccessor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -98,7 +97,7 @@ public class SourceNodeTest {
     public void shouldExposeProcessMetrics() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", new MockTime());
         final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final SourceNode<String, String> node =
             new SourceNode<>(context.currentNode().name(), new 
TheDeserializer(), new TheDeserializer());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index d9122761daa..0ccdc8b39c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -114,7 +114,7 @@ public class StandbyTaskTest {
 
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName, StreamsConfig.METRICS_LATEST, time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName, time);
 
     private File baseDir;
     private StreamsConfig config;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 029b6a73f4e..acda88715d5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -53,7 +53,6 @@ import 
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
-import 
org.apache.kafka.streams.errors.ProcessingExceptionHandler.ProcessingHandlerResponse;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -609,7 +608,7 @@ public class StreamTaskTest {
     public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig());
 
         assertFalse(task.process(time.milliseconds()));
 
@@ -632,7 +631,7 @@ public class StreamTaskTest {
     public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = 
createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = 
createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "0"));
 
         assertFalse(task.process(time.milliseconds()));
 
@@ -656,7 +655,7 @@ public class StreamTaskTest {
     public void shouldRecordBufferedRecords() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
 
         final KafkaMetric metric = getMetric("active-buffer", "%s-count", 
task.id().toString());
 
@@ -734,13 +733,13 @@ public class StreamTaskTest {
         final String sourceNodeName = evenKeyForwardingSourceNode.name();
         final String terminalNodeName = processorStreamTime.name();
 
-        final Metric sourceAvg = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST);
-        final Metric sourceMin = getProcessorMetric("record-e2e-latency", 
"%s-min", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST);
-        final Metric sourceMax = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST);
+        final Metric sourceAvg = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), sourceNodeName);
+        final Metric sourceMin = getProcessorMetric("record-e2e-latency", 
"%s-min", task.id().toString(), sourceNodeName);
+        final Metric sourceMax = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), sourceNodeName);
 
-        final Metric terminalAvg = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
-        final Metric terminalMin = getProcessorMetric("record-e2e-latency", 
"%s-min", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
-        final Metric terminalMax = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
+        final Metric terminalAvg = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), terminalNodeName);
+        final Metric terminalMin = getProcessorMetric("record-e2e-latency", 
"%s-min", task.id().toString(), terminalNodeName);
+        final Metric terminalMax = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), terminalNodeName);
 
         // e2e latency = 10
         task.addRecords(partition1, 
singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
@@ -802,7 +801,7 @@ public class StreamTaskTest {
     public void shouldRecordRestoredRecords() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
 
         final KafkaMetric totalMetric = getMetric("restore", "%s-total", 
task.id().toString());
         final KafkaMetric rateMetric = getMetric("restore", "%s-rate", 
task.id().toString());
@@ -927,7 +926,6 @@ public class StreamTaskTest {
     }
 
     private void testMetricsForBuiltInMetricsVersionLatest() {
-        final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
         assertNull(getMetric("commit", "%s-latency-avg", "all"));
         assertNull(getMetric("commit", "%s-latency-max", "all"));
         assertNull(getMetric("commit", "%s-rate", "all"));
@@ -960,8 +958,7 @@ public class StreamTaskTest {
     private Metric getProcessorMetric(final String operation,
                                       final String nameFormat,
                                       final String taskId,
-                                      final String processorNodeId,
-                                      final String builtInMetricsVersion) {
+                                      final String processorNodeId) {
 
         return getMetricByNameFilterByTags(
             metrics.metrics(),
@@ -1213,7 +1210,7 @@ public class StreamTaskTest {
     public void shouldRespectCommitNeeded() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
 
@@ -1255,7 +1252,7 @@ public class StreamTaskTest {
     public void 
shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
 
@@ -2311,7 +2308,7 @@ public class StreamTaskTest {
     public void shouldClearCommitStatusesInCloseDirty() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
 
@@ -2361,7 +2358,7 @@ public class StreamTaskTest {
     public void shouldThrowIfCleanClosingDirtyTask() {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"));
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
 
@@ -2452,7 +2449,7 @@ public class StreamTaskTest {
                 streamsMetrics,
                 null
         );
-        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time);
+        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", time);
 
         // The processor topology is missing the topics
         final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -2981,8 +2978,7 @@ public class StreamTaskTest {
         );
     }
 
-    private StreamTask createSingleSourceStateless(final StreamsConfig config,
-                                                   final String 
builtInMetricsVersion) {
+    private StreamTask createSingleSourceStateless(final StreamsConfig config) 
{
         final ProcessorTopology topology = withSources(
             asList(source1, processorStreamTime, processorSystemTime),
             mkMap(mkEntry(topic1, source1))
@@ -3005,7 +3001,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, 
time),
+            new StreamsMetricsImpl(metrics, "test", time),
             stateDirectory,
             cache,
             time,
@@ -3042,7 +3038,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, time),
+            new StreamsMetricsImpl(metrics, "test", time),
             stateDirectory,
             cache,
             time,
@@ -3078,7 +3074,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, time),
+            new StreamsMetricsImpl(metrics, "test", time),
             stateDirectory,
             cache,
             time,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 53a61f09b83..ae1baad47f9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -307,7 +307,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             time
         );
 
@@ -715,7 +714,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             mockTime
         );
 
@@ -780,7 +778,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             mockTime
         );
 
@@ -1145,7 +1142,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
 
@@ -1368,7 +1365,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
@@ -1422,7 +1419,7 @@ public class StreamThreadTest {
         }
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
 
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
         final StreamsConfig config = new StreamsConfig(props);
@@ -1468,7 +1465,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
@@ -1488,7 +1485,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
@@ -1889,7 +1886,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             mockTime
         );
 
@@ -2592,7 +2588,7 @@ public class StreamThreadTest {
         doThrow(new TaskMigratedException("Task lost exception", new 
RuntimeException())).when(taskManager).handleLostAll();
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
@@ -2622,7 +2618,7 @@ public class StreamThreadTest {
         doThrow(new TaskMigratedException("Revocation non fatal exception", 
new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
@@ -2654,7 +2650,7 @@ public class StreamThreadTest {
         when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2712,7 +2708,7 @@ public class StreamThreadTest {
         doThrow(new 
TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2779,7 +2775,7 @@ public class StreamThreadTest {
         doNothing().when(taskManager).handleLostAll();
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2842,7 +2838,7 @@ public class StreamThreadTest {
         doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2902,7 +2898,7 @@ public class StreamThreadTest {
         when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3086,7 +3082,7 @@ public class StreamThreadTest {
         when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata);
@@ -3111,7 +3107,7 @@ public class StreamThreadTest {
         final TaskManager taskManager = mock(TaskManager.class);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3167,7 +3163,7 @@ public class StreamThreadTest {
         
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
         final TaskManager taskManager = mock(TaskManager.class);
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3583,7 +3579,7 @@ public class StreamThreadTest {
             "",
             taskManager,
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
             topologyMetadata,
             "thread-id",
             new LogContext(),
@@ -3633,7 +3629,7 @@ public class StreamThreadTest {
         final LogContext logContext = new LogContext("test");
         final Logger log = logContext.logger(StreamThreadTest.class);
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
             new TopologyMetadata(internalTopologyBuilder, config),
             config,
@@ -3692,7 +3688,7 @@ public class StreamThreadTest {
                                            final StreamsConfig config,
                                            final TopologyMetadata 
topologyMetadata) {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
 
         return new StreamThread(
             mockTime,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 7a61e6b8e0b..218948cafc0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
 import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
 import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -93,7 +92,6 @@ public class StreamsMetricsImplTest {
     private static final String SENSOR_NAME_1 = "sensor1";
     private static final String SENSOR_NAME_2 = "sensor2";
     private static final String INTERNAL_PREFIX = "internal";
-    private static final String VERSION = StreamsConfig.METRICS_LATEST;
     private static final String CLIENT_ID = "test-client";
     private static final String THREAD_ID1 = "test-thread-1";
     private static final String TASK_ID1 = "test-task-1";
@@ -139,7 +137,7 @@ public class StreamsMetricsImplTest {
     private final MetricName metricName2 =
         new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, 
clientLevelTags);
     private final MockTime time = new MockTime(0);
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
     private static MetricConfig eqMetricConfig(final MetricConfig 
metricConfig) {
         final StringBuffer message = new StringBuffer();
@@ -254,7 +252,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
 
@@ -266,7 +264,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
 
@@ -278,7 +276,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
             THREAD_ID1,
@@ -295,7 +293,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
             THREAD_ID1,
@@ -312,7 +310,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
             THREAD_ID1,
@@ -331,7 +329,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
             THREAD_ID1,
@@ -350,7 +348,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final ArgumentCaptor<String> sensorKeys = 
setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
             TASK_ID1,
@@ -368,7 +366,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
             TASK_ID1,
@@ -384,7 +382,7 @@ public class StreamsMetricsImplTest {
     public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, 
INFO_RECORDING_LEVEL);
@@ -396,7 +394,7 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -408,7 +406,7 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -420,7 +418,7 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() 
throws InterruptedException {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         final Thread otherThread =
@@ -435,7 +433,7 @@ public class StreamsMetricsImplTest {
     public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -459,7 +457,7 @@ public class StreamsMetricsImplTest {
             .thenReturn(metricName);
         when(metrics.metric(metricName)).thenReturn(null);
         when(metrics.addMetricIfAbsent(eq(metricName), 
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.addStoreLevelMutableMetric(
             TASK_ID1,
@@ -491,7 +489,7 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .thenReturn(metricName);
         when(metrics.metric(metricName)).thenReturn(null);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.addStoreLevelMutableMetric(
             TASK_ID1,
@@ -539,7 +537,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveStateStoreLevelSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         final MetricName metricName1 =
             new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricName metricName2 =
@@ -562,7 +560,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
             THREAD_ID1,
@@ -580,7 +578,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
             THREAD_ID1,
@@ -599,7 +597,7 @@ public class StreamsMetricsImplTest {
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
         setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
             THREAD_ID1,
@@ -618,7 +616,7 @@ public class StreamsMetricsImplTest {
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
             THREAD_ID1, TASK_ID1,
@@ -635,7 +633,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics, recordingLevel);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
 
@@ -647,7 +645,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
 
@@ -664,7 +662,7 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, 
DESCRIPTION1, clientLevelTags))
             .thenReturn(metricName1);
         doNothing().when(metrics).addMetric(eq(metricName1), 
eqMetricConfig(metricConfig), eq(immutableValue));
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, 
DESCRIPTION1, recordingLevel, value);
     }
@@ -678,7 +676,7 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, 
DESCRIPTION1, clientLevelTags))
             .thenReturn(metricName1);
         doNothing().when(metrics).addMetric(eq(metricName1), 
eqMetricConfig(metricConfig), eq(valueProvider));
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, 
recordingLevel, valueProvider);
     }
@@ -699,7 +697,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         final ArgumentCaptor<String> sensorKeys = 
addSensorsOnAllLevels(metrics, streamsMetrics);
 
         
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
@@ -712,7 +710,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveThreadLevelSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         addSensorsOnAllLevels(metrics, streamsMetrics);
         setupRemoveSensorsTest(metrics, THREAD_ID1);
 
@@ -721,7 +719,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void testNullMetrics() {
-        assertThrows(NullPointerException.class, () -> new 
StreamsMetricsImpl(null, "", VERSION, time));
+        assertThrows(NullPointerException.class, () -> new 
StreamsMetricsImpl(null, "", time));
     }
 
     @Test
@@ -754,7 +752,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void testMultiLevelSensorRemoval() {
         final Metrics registry = new Metrics();
-        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
THREAD_ID1, VERSION, time);
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
THREAD_ID1, time);
         for (final MetricName defaultMetric : registry.metrics().keySet()) {
             registry.removeMetric(defaultMetric);
         }
@@ -860,7 +858,7 @@ public class StreamsMetricsImplTest {
         final MockTime time = new MockTime(1);
         final MetricConfig config = new MetricConfig().timeWindow(1, 
TimeUnit.MILLISECONDS);
         final Metrics metrics = new Metrics(config, time);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "", VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "", time);
 
         final String scope = "scope";
         final String entity = "entity";
@@ -894,7 +892,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddLatencyRateTotalSensor() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         shouldAddCustomSensor(
             streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, 
OPERATION_NAME, RecordingLevel.DEBUG),
             streamsMetrics,
@@ -909,7 +907,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddRateTotalSensor() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         shouldAddCustomSensor(
             streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, 
OPERATION_NAME, RecordingLevel.DEBUG),
             streamsMetrics,
@@ -1044,7 +1042,7 @@ public class StreamsMetricsImplTest {
         final String taskName = "test-task";
         final String storeType = "remote-window";
         final String storeName = "window-keeper";
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         final Map<String, String> tagMap = 
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
 
@@ -1059,7 +1057,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldGetCacheLevelTagMap() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+            new StreamsMetricsImpl(metrics, THREAD_ID1, time);
         final String taskName = "taskName";
         final String storeName = "storeName";
 
@@ -1076,7 +1074,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldGetThreadLevelTagMap() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         final Map<String, String> tagMap = 
streamsMetrics.threadLevelTagMap(THREAD_ID1);
 
@@ -1209,7 +1207,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldReturnMetricsVersionCurrent() {
         assertThat(
-            new StreamsMetricsImpl(metrics, THREAD_ID1, 
StreamsConfig.METRICS_LATEST, time).version(),
+            new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(),
             equalTo(Version.LATEST)
         );
     }
@@ -1268,7 +1266,7 @@ public class StreamsMetricsImplTest {
     public void shouldAddThreadLevelMutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         streamsMetrics.addThreadLevelMutableMetric(
             "foobar",
@@ -1290,7 +1288,7 @@ public class StreamsMetricsImplTest {
     public void shouldCleanupThreadLevelMutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
         streamsMetrics.addThreadLevelMutableMetric(
             "foobar",
             "test metric",
@@ -1312,7 +1310,7 @@ public class StreamsMetricsImplTest {
     public void shouldAddThreadLevelImmutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         streamsMetrics.addThreadLevelImmutableMetric(
             "foobar",
@@ -1334,7 +1332,7 @@ public class StreamsMetricsImplTest {
     public void shouldCleanupThreadLevelImmutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
         streamsMetrics.addThreadLevelImmutableMetric(
             "foobar",
             "test metric",
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 76a5f1bdc61..e3fae7e7d69 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1416,7 +1416,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1452,7 +1452,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1491,7 +1491,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1532,7 +1532,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 0f875e91f55..77149d1d410 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -573,7 +573,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -613,7 +613,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -655,7 +655,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -699,7 +699,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index e08d6e23eb9..35efe5891a9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -98,7 +98,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
-            new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             streamsConfig,
             () -> collector,
             new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 7c282379a91..e2f953de2f8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -113,7 +113,7 @@ public class GlobalStateStoreProviderTest {
         when(mockContext.applicationId()).thenReturn("appId");
         when(mockContext.metrics())
             .thenReturn(
-                new StreamsMetricsImpl(new Metrics(), "threadName", 
StreamsConfig.METRICS_LATEST, new MockTime())
+                new StreamsMetricsImpl(new Metrics(), "threadName", new 
MockTime())
             );
         when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
         when(mockContext.appConfigs()).thenReturn(CONFIGS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index 6756347586f..108e6e631b0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -58,7 +57,7 @@ public class KeyValueSegmentTest {
     @BeforeEach
     public void setUp() {
         metricsRecorder.init(
-            new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
             new TaskId(0, 0)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 1c4f614e3f4..7e7cf77075a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -125,7 +124,7 @@ public class MeteredKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics()).thenReturn(
-            new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, mockTime)
+            new StreamsMetricsImpl(metrics, "test", mockTime)
         );
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 08655a380df..92930eca91d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateStoreContext;
@@ -128,7 +127,7 @@ public class MeteredSessionStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics())
-                .thenReturn(new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, mockTime));
+                .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(innerStore.name()).thenReturn(STORE_NAME);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 008b052d95e..35c1a5e2cc8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -129,7 +129,7 @@ public class MeteredTimestampedKeyValueStoreTest {
         setUpWithoutContext();
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics())
-            .thenReturn(new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, mockTime));
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(inner.name()).thenReturn(STORE_NAME);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 448753c87fc..0682a75c35b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -78,7 +78,7 @@ public class MeteredTimestampedWindowStoreTest {
 
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(metrics, "test", new MockTime());
 
         context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
@@ -106,7 +106,7 @@ public class MeteredTimestampedWindowStoreTest {
 
     public void setUpWithoutContextName() {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, new MockTime());
+                new StreamsMetricsImpl(metrics, "test", new MockTime());
 
         context = new InternalMockProcessorContext<>(
                 TestUtils.tempDirectory(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index afcbf740c2b..17c515f3825 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -112,7 +111,7 @@ public class MeteredVersionedKeyValueStoreTest {
     @BeforeEach
     public void setUp() {
         when(inner.name()).thenReturn(STORE_NAME);
-        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, 
"test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, 
"test", mockTime));
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.taskId()).thenReturn(TASK_ID);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 4f25c029172..dd1297c52c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -117,7 +117,7 @@ public class MeteredWindowStoreTest {
     @BeforeEach
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(metrics, "test", new MockTime());
         context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 57e8db6a6e5..d8ee0a6316b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -919,7 +919,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.DEBUG));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", 
StreamsConfig.METRICS_LATEST, time);
+            new StreamsMetricsImpl(metrics, "test-application", time);
 
         context = mock(InternalMockProcessorContext.class);
         when(context.metrics()).thenReturn(streamsMetrics);
@@ -952,7 +952,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", 
StreamsConfig.METRICS_LATEST, time);
+            new StreamsMetricsImpl(metrics, "test-application", time);
 
         context = mock(InternalMockProcessorContext.class);
         when(context.metrics()).thenReturn(streamsMetrics);
@@ -984,7 +984,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", 
StreamsConfig.METRICS_LATEST, time);
+            new StreamsMetricsImpl(metrics, "test-application", time);
 
         final Properties props = StreamsTestUtils.getStreamsConfig();
         context = mock(InternalMockProcessorContext.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 33b97af3386..47456fc03ee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Record;
@@ -66,7 +65,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
         when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
         final Metrics metrics = new Metrics();
         offset = 0;
-        streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+        streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new 
MockTime());
         context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 489bb12db2a..9ebac835b43 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -58,7 +57,7 @@ public class TimestampedSegmentTest {
     @BeforeEach
     public void setUp() {
         metricsRecorder.init(
-            new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
             new TaskId(0, 0)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 00557ab0c00..113de5959a4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
@@ -203,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifySumOfProperties(final String propertyName) throws 
Exception {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
@@ -220,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String 
propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+                new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
@@ -237,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifyBlockCacheMetricsWithSingleCache(final String 
propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index e9f67a2833a..44c7373c45b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals.metrics;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
@@ -179,7 +178,7 @@ public class RocksDBMetricsRecorderTest {
         assertThrows(
             IllegalStateException.class,
             () -> recorder.init(
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
                 TASK_ID1
             )
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ff9080a2de1..4ba7e565c6d 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -89,7 +89,7 @@ public class InternalMockProcessorContext<KOut, VOut>
         this(null,
             null,
             null,
-            new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             null,
             null,
@@ -106,7 +106,6 @@ public class InternalMockProcessorContext<KOut, VOut>
             new StreamsMetricsImpl(
                 new Metrics(),
                 "mock",
-                
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
                 new MockTime()
             ),
             config,
@@ -139,7 +138,6 @@ public class InternalMockProcessorContext<KOut, VOut>
             new StreamsMetricsImpl(
                 new Metrics(),
                 "mock",
-                
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
                 new MockTime()
             ),
             config,
@@ -157,7 +155,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             stateDir,
             keySerde,
             valueSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             config,
             null,
             null,
@@ -177,7 +175,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             null,
             serdes.keySerde(),
             serdes.valueSerde(),
-            new StreamsMetricsImpl(metrics, "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(metrics, "mock", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             null,
@@ -194,7 +192,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             stateDir,
             keySerde,
             valueSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", 
StreamsConfig.METRICS_LATEST, new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             cache,
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 9e76fad4264..884fa2c7dda 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -403,7 +403,6 @@ public class TopologyTestDriver implements Closeable {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             "test-client",
-            
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             mockWallClockTime
         );
         TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), 
streamsMetrics);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index a9b47b59da9..8399233a4e4 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -242,7 +242,6 @@ public class MockProcessorContext implements 
ProcessorContext, RecordCollector.S
         this.metrics = new StreamsMetricsImpl(
             new Metrics(metricConfig),
             threadId,
-            
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             Time.SYSTEM
         );
         TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 7fe262092dd..146359bf25e 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -255,7 +255,6 @@ public class MockProcessorContext<KForward, VForward> 
implements ProcessorContex
         metrics = new StreamsMetricsImpl(
             new Metrics(metricConfig),
             threadId,
-            
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             Time.SYSTEM
         );
         TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 5b0a858a86e..236ca53f791 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -232,7 +232,6 @@ public class MockProcessorContextTest {
         when(mockInternalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(
             new Metrics(new MetricConfig()),
             Thread.currentThread().getName(),
-            "",
             Time.SYSTEM
         ));
         when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 
1));


Reply via email to