This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 27a3c752161 KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" (#17182) 27a3c752161 is described below commit 27a3c752161d97db096dc0e8d905c7224b4aee2b Author: Joao Pedro Fonseca Dantas <67479090+fonsd...@users.noreply.github.com> AuthorDate: Thu Sep 26 21:04:01 2024 -0300 KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" (#17182) This PR simply StreamsMetricsImpl to avoid passing in the unused "metric version" parameter. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../org/apache/kafka/streams/KafkaStreams.java | 1 - .../internals/metrics/StreamsMetricsImpl.java | 2 - ...KStreamSessionWindowAggregateProcessorTest.java | 2 +- .../processor/internals/ActiveTaskCreatorTest.java | 2 +- .../internals/GlobalStreamThreadTest.java | 4 +- .../processor/internals/MockStreamsMetrics.java | 3 +- .../processor/internals/ProcessorNodeTest.java | 6 +- .../processor/internals/RecordQueueTest.java | 3 +- .../processor/internals/SourceNodeTest.java | 3 +- .../processor/internals/StandbyTaskTest.java | 2 +- .../processor/internals/StreamTaskTest.java | 44 ++++++------ .../processor/internals/StreamThreadTest.java | 40 +++++------ .../internals/metrics/StreamsMetricsImplTest.java | 82 +++++++++++----------- ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 8 +-- .../AbstractRocksDBSegmentedBytesStoreTest.java | 8 +-- .../ChangeLoggingKeyValueBytesStoreTest.java | 2 +- .../internals/GlobalStateStoreProviderTest.java | 2 +- .../state/internals/KeyValueSegmentTest.java | 3 +- .../state/internals/MeteredKeyValueStoreTest.java | 3 +- .../state/internals/MeteredSessionStoreTest.java | 3 +- .../MeteredTimestampedKeyValueStoreTest.java | 2 +- .../MeteredTimestampedWindowStoreTest.java | 4 +- .../MeteredVersionedKeyValueStoreTest.java | 3 +- .../state/internals/MeteredWindowStoreTest.java | 2 +- .../streams/state/internals/RocksDBStoreTest.java | 6 +- .../RocksDBTimeOrderedKeyValueBufferTest.java | 3 +- .../state/internals/TimestampedSegmentTest.java | 3 +- .../metrics/RocksDBMetricsRecorderGaugesTest.java | 7 +- .../metrics/RocksDBMetricsRecorderTest.java | 3 +- .../kafka/test/InternalMockProcessorContext.java | 10 ++- .../apache/kafka/streams/TopologyTestDriver.java | 1 - .../streams/processor/MockProcessorContext.java | 1 - .../processor/api/MockProcessorContext.java | 1 - .../kafka/streams/MockProcessorContextTest.java | 1 - 34 files changed, 120 insertions(+), 150 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ff5d80e5556..089d0b28206 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1002,7 +1002,6 @@ public class KafkaStreams implements AutoCloseable { streamsMetrics = new StreamsMetricsImpl( metrics, clientId, - applicationConfigs.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index e7a5c3202a0..f5660419b15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -162,10 +162,8 @@ public class StreamsMetricsImpl implements StreamsMetrics { public StreamsMetricsImpl(final Metrics metrics, final String clientId, - final String builtInMetricsVersion, final Time time) { Objects.requireNonNull(metrics, "Metrics cannot be null"); - Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null"); this.metrics = metrics; this.clientId = clientId; version = Version.LATEST; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 9d060a5882c..5d931e9fdf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time); private final String threadId = Thread.currentThread().getName(); private final Initializer<Long> initializer = () -> 0L; private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 7bb7a9eb751..c1f58b3972d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest { private ChangelogReader changeLogReader; private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST, new MockTime()); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime()); private final Map<String, Object> properties = mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234") diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 73b610227a5..d7d2971db32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -134,7 +134,7 @@ public class GlobalStreamThreadTest { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, time), + new StreamsMetricsImpl(new Metrics(), "test-client", time), time, "clientId", stateRestoreListener, @@ -173,7 +173,7 @@ public class GlobalStreamThreadTest { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, time), + new StreamsMetricsImpl(new Metrics(), "test-client", time), time, "clientId", stateRestoreListener, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java index bb0303c9493..4ed68ef8150 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java @@ -18,12 +18,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; public class MockStreamsMetrics extends StreamsMetricsImpl { public MockStreamsMetrics(final Metrics metrics) { - super(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + super(metrics, "test", new MockTime()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 7f4e2d08491..5eb5a39ddba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -215,7 +215,7 @@ public class ProcessorNodeTest { public void testMetricsWithBuiltInMetricsVersionLatest() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", new MockTime()); final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet()); @@ -299,7 +299,7 @@ public class ProcessorNodeTest { public void testTopologyLevelClassCastExceptionDirect() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", new MockTime()); final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet()); @@ -319,7 +319,7 @@ public class ProcessorNodeTest { final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT)); when(internalProcessorContext.taskId()).thenReturn(TASK_ID); - when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime())); + when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime())); when(internalProcessorContext.topic()).thenReturn(TOPIC); when(internalProcessorContext.partition()).thenReturn(PARTITION); when(internalProcessorContext.offset()).thenReturn(OFFSET); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index dd9efe581ff..076aeef7939 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; @@ -73,7 +72,7 @@ public class RecordQueueTest { private final Metrics metrics = new Metrics(); private final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "mock", new MockTime()); @SuppressWarnings("rawtypes") final InternalMockProcessorContext context = new InternalMockProcessorContext<>( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index a242aa8ccfc..90d88359042 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.SensorAccessor; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -98,7 +97,7 @@ public class SourceNodeTest { public void shouldExposeProcessMetrics() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", new MockTime()); final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics); final SourceNode<String, String> node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index d9122761daa..0ccdc8b39c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -114,7 +114,7 @@ public class StandbyTaskTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, StreamsConfig.METRICS_LATEST, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, time); private File baseDir; private StreamsConfig config; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 029b6a73f4e..acda88715d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -53,7 +53,6 @@ import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; -import org.apache.kafka.streams.errors.ProcessingExceptionHandler.ProcessingHandlerResponse; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; @@ -609,7 +608,7 @@ public class StreamTaskTest { public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig()); assertFalse(task.process(time.milliseconds())); @@ -632,7 +631,7 @@ public class StreamTaskTest { public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_V2, "0")); assertFalse(task.process(time.milliseconds())); @@ -656,7 +655,7 @@ public class StreamTaskTest { public void shouldRecordBufferedRecords() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0")); final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString()); @@ -734,13 +733,13 @@ public class StreamTaskTest { final String sourceNodeName = evenKeyForwardingSourceNode.name(); final String terminalNodeName = processorStreamTime.name(); - final Metric sourceAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST); - final Metric sourceMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST); - final Metric sourceMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNodeName, StreamsConfig.METRICS_LATEST); + final Metric sourceAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNodeName); + final Metric sourceMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), sourceNodeName); + final Metric sourceMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNodeName); - final Metric terminalAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST); - final Metric terminalMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST); - final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST); + final Metric terminalAvg = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), terminalNodeName); + final Metric terminalMin = getProcessorMetric("record-e2e-latency", "%s-min", task.id().toString(), terminalNodeName); + final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName); // e2e latency = 10 task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L))); @@ -802,7 +801,7 @@ public class StreamTaskTest { public void shouldRecordRestoredRecords() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0")); final KafkaMetric totalMetric = getMetric("restore", "%s-total", task.id().toString()); final KafkaMetric rateMetric = getMetric("restore", "%s-rate", task.id().toString()); @@ -927,7 +926,6 @@ public class StreamTaskTest { } private void testMetricsForBuiltInMetricsVersionLatest() { - final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST; assertNull(getMetric("commit", "%s-latency-avg", "all")); assertNull(getMetric("commit", "%s-latency-max", "all")); assertNull(getMetric("commit", "%s-rate", "all")); @@ -960,8 +958,7 @@ public class StreamTaskTest { private Metric getProcessorMetric(final String operation, final String nameFormat, final String taskId, - final String processorNodeId, - final String builtInMetricsVersion) { + final String processorNodeId) { return getMetricByNameFilterByTags( metrics.metrics(), @@ -1213,7 +1210,7 @@ public class StreamTaskTest { public void shouldRespectCommitNeeded() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0")); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); @@ -1255,7 +1252,7 @@ public class StreamTaskTest { public void shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0")); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); @@ -2311,7 +2308,7 @@ public class StreamTaskTest { public void shouldClearCommitStatusesInCloseDirty() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0")); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); @@ -2361,7 +2358,7 @@ public class StreamTaskTest { public void shouldThrowIfCleanClosingDirtyTask() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0")); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); @@ -2452,7 +2449,7 @@ public class StreamTaskTest { streamsMetrics, null ); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", time); // The processor topology is missing the topics final ProcessorTopology topology = withSources(emptyList(), mkMap()); @@ -2981,8 +2978,7 @@ public class StreamTaskTest { ); } - private StreamTask createSingleSourceStateless(final StreamsConfig config, - final String builtInMetricsVersion) { + private StreamTask createSingleSourceStateless(final StreamsConfig config) { final ProcessorTopology topology = withSources( asList(source1, processorStreamTime, processorSystemTime), mkMap(mkEntry(topic1, source1)) @@ -3005,7 +3001,7 @@ public class StreamTaskTest { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, time), + new StreamsMetricsImpl(metrics, "test", time), stateDirectory, cache, time, @@ -3042,7 +3038,7 @@ public class StreamTaskTest { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time), + new StreamsMetricsImpl(metrics, "test", time), stateDirectory, cache, time, @@ -3078,7 +3074,7 @@ public class StreamTaskTest { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time), + new StreamsMetricsImpl(metrics, "test", time), stateDirectory, cache, time, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 53a61f09b83..ae1baad47f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -307,7 +307,6 @@ public class StreamThreadTest { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time ); @@ -715,7 +714,6 @@ public class StreamThreadTest { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), mockTime ); @@ -780,7 +778,6 @@ public class StreamThreadTest { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), mockTime ); @@ -1145,7 +1142,7 @@ public class StreamThreadTest { final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); @@ -1368,7 +1365,7 @@ public class StreamThreadTest { final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1422,7 +1419,7 @@ public class StreamThreadTest { } final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); @@ -1468,7 +1465,7 @@ public class StreamThreadTest { final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1488,7 +1485,7 @@ public class StreamThreadTest { final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1889,7 +1886,6 @@ public class StreamThreadTest { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), mockTime ); @@ -2592,7 +2588,7 @@ public class StreamThreadTest { doThrow(new TaskMigratedException("Task lost exception", new RuntimeException())).when(taskManager).handleLostAll(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -2622,7 +2618,7 @@ public class StreamThreadTest { doThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -2654,7 +2650,7 @@ public class StreamThreadTest { when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2712,7 +2708,7 @@ public class StreamThreadTest { doThrow(new TimeoutException()).when(taskManager).handleCorruption(corruptedTasks); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2779,7 +2775,7 @@ public class StreamThreadTest { doNothing().when(taskManager).handleLostAll(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2842,7 +2838,7 @@ public class StreamThreadTest { doNothing().when(consumer).enforceRebalance("Active tasks corrupted"); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2902,7 +2898,7 @@ public class StreamThreadTest { when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3086,7 +3082,7 @@ public class StreamThreadTest { when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); @@ -3111,7 +3107,7 @@ public class StreamThreadTest { final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3167,7 +3163,7 @@ public class StreamThreadTest { when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3583,7 +3579,7 @@ public class StreamThreadTest { "", taskManager, null, - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), topologyMetadata, "thread-id", new LogContext(), @@ -3633,7 +3629,7 @@ public class StreamThreadTest { final LogContext logContext = new LogContext("test"); final Logger log = logContext.logger(StreamThreadTest.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( new TopologyMetadata(internalTopologyBuilder, config), config, @@ -3692,7 +3688,7 @@ public class StreamThreadTest { final StreamsConfig config, final TopologyMetadata topologyMetadata) { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); return new StreamThread( mockTime, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 7a61e6b8e0b..218948cafc0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.apache.kafka.test.StreamsTestUtils; @@ -93,7 +92,6 @@ public class StreamsMetricsImplTest { private static final String SENSOR_NAME_1 = "sensor1"; private static final String SENSOR_NAME_2 = "sensor2"; private static final String INTERNAL_PREFIX = "internal"; - private static final String VERSION = StreamsConfig.METRICS_LATEST; private static final String CLIENT_ID = "test-client"; private static final String THREAD_ID1 = "test-thread-1"; private static final String TASK_ID1 = "test-task-1"; @@ -139,7 +137,7 @@ public class StreamsMetricsImplTest { private final MetricName metricName2 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags); private final MockTime time = new MockTime(0); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) { final StringBuffer message = new StringBuffer(); @@ -254,7 +252,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -266,7 +264,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -278,7 +276,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -295,7 +293,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -312,7 +310,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -331,7 +329,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -350,7 +348,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -368,7 +366,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -384,7 +382,7 @@ public class StreamsMetricsImplTest { public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL); @@ -396,7 +394,7 @@ public class StreamsMetricsImplTest { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -408,7 +406,7 @@ public class StreamsMetricsImplTest { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -420,7 +418,7 @@ public class StreamsMetricsImplTest { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); final Thread otherThread = @@ -435,7 +433,7 @@ public class StreamsMetricsImplTest { public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -459,7 +457,7 @@ public class StreamsMetricsImplTest { .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -491,7 +489,7 @@ public class StreamsMetricsImplTest { when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -539,7 +537,7 @@ public class StreamsMetricsImplTest { @Test public void shouldRemoveStateStoreLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final MetricName metricName1 = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricName metricName2 = @@ -562,7 +560,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -580,7 +578,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -599,7 +597,7 @@ public class StreamsMetricsImplTest { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, @@ -618,7 +616,7 @@ public class StreamsMetricsImplTest { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, TASK_ID1, @@ -635,7 +633,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -647,7 +645,7 @@ public class StreamsMetricsImplTest { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -664,7 +662,7 @@ public class StreamsMetricsImplTest { when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) .thenReturn(metricName1); doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value); } @@ -678,7 +676,7 @@ public class StreamsMetricsImplTest { when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) .thenReturn(metricName1); doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider); } @@ -699,7 +697,7 @@ public class StreamsMetricsImplTest { @Test public void shouldRemoveClientLevelMetricsAndSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final ArgumentCaptor<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0)); @@ -712,7 +710,7 @@ public class StreamsMetricsImplTest { @Test public void shouldRemoveThreadLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); addSensorsOnAllLevels(metrics, streamsMetrics); setupRemoveSensorsTest(metrics, THREAD_ID1); @@ -721,7 +719,7 @@ public class StreamsMetricsImplTest { @Test public void testNullMetrics() { - assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", VERSION, time)); + assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", time)); } @Test @@ -754,7 +752,7 @@ public class StreamsMetricsImplTest { @Test public void testMultiLevelSensorRemoval() { final Metrics registry = new Metrics(); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, VERSION, time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, time); for (final MetricName defaultMetric : registry.metrics().keySet()) { registry.removeMetric(defaultMetric); } @@ -860,7 +858,7 @@ public class StreamsMetricsImplTest { final MockTime time = new MockTime(1); final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS); final Metrics metrics = new Metrics(config, time); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", time); final String scope = "scope"; final String entity = "entity"; @@ -894,7 +892,7 @@ public class StreamsMetricsImplTest { @Test public void shouldAddLatencyRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); shouldAddCustomSensor( streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -909,7 +907,7 @@ public class StreamsMetricsImplTest { @Test public void shouldAddRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); shouldAddCustomSensor( streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -1044,7 +1042,7 @@ public class StreamsMetricsImplTest { final String taskName = "test-task"; final String storeType = "remote-window"; final String storeName = "window-keeper"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time); final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); @@ -1059,7 +1057,7 @@ public class StreamsMetricsImplTest { @Test public void shouldGetCacheLevelTagMap() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + new StreamsMetricsImpl(metrics, THREAD_ID1, time); final String taskName = "taskName"; final String storeName = "storeName"; @@ -1076,7 +1074,7 @@ public class StreamsMetricsImplTest { @Test public void shouldGetThreadLevelTagMap() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1); @@ -1209,7 +1207,7 @@ public class StreamsMetricsImplTest { @Test public void shouldReturnMetricsVersionCurrent() { assertThat( - new StreamsMetricsImpl(metrics, THREAD_ID1, StreamsConfig.METRICS_LATEST, time).version(), + new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(), equalTo(Version.LATEST) ); } @@ -1268,7 +1266,7 @@ public class StreamsMetricsImplTest { public void shouldAddThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelMutableMetric( "foobar", @@ -1290,7 +1288,7 @@ public class StreamsMetricsImplTest { public void shouldCleanupThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelMutableMetric( "foobar", "test metric", @@ -1312,7 +1310,7 @@ public class StreamsMetricsImplTest { public void shouldAddThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", @@ -1334,7 +1332,7 @@ public class StreamsMetricsImplTest { public void shouldCleanupThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", "test metric", diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 76a5f1bdc61..e3fae7e7d69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -1416,7 +1416,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1452,7 +1452,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1491,7 +1491,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1532,7 +1532,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 0f875e91f55..77149d1d410 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -573,7 +573,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -613,7 +613,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -655,7 +655,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -699,7 +699,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index e08d6e23eb9..35efe5891a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -98,7 +98,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), streamsConfig, () -> collector, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 7c282379a91..e2f953de2f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -113,7 +113,7 @@ public class GlobalStateStoreProviderTest { when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.metrics()) .thenReturn( - new StreamsMetricsImpl(new Metrics(), "threadName", StreamsConfig.METRICS_LATEST, new MockTime()) + new StreamsMetricsImpl(new Metrics(), "threadName", new MockTime()) ); when(mockContext.taskId()).thenReturn(new TaskId(0, 0)); when(mockContext.appConfigs()).thenReturn(CONFIGS); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index 6756347586f..108e6e631b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -58,7 +57,7 @@ public class KeyValueSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 1c4f614e3f4..7e7cf77075a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -125,7 +124,7 @@ public class MeteredKeyValueStoreTest { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()).thenReturn( - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime) + new StreamsMetricsImpl(metrics, "test", mockTime) ); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 08655a380df..92930eca91d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateStoreContext; @@ -128,7 +127,7 @@ public class MeteredSessionStoreTest { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(innerStore.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 008b052d95e..35c1a5e2cc8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -129,7 +129,7 @@ public class MeteredTimestampedKeyValueStoreTest { setUpWithoutContext(); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(inner.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index 448753c87fc..0682a75c35b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -78,7 +78,7 @@ public class MeteredTimestampedWindowStoreTest { public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "test", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), @@ -106,7 +106,7 @@ public class MeteredTimestampedWindowStoreTest { public void setUpWithoutContextName() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "test", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index afcbf740c2b..17c515f3825 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -112,7 +111,7 @@ public class MeteredVersionedKeyValueStoreTest { @BeforeEach public void setUp() { when(inner.name()).thenReturn(STORE_NAME); - when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); + when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.taskId()).thenReturn(TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 4f25c029172..dd1297c52c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -117,7 +117,7 @@ public class MeteredWindowStoreTest { @BeforeEach public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(metrics, "test", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 57e8db6a6e5..d8ee0a6316b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -919,7 +919,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time); + new StreamsMetricsImpl(metrics, "test-application", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -952,7 +952,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time); + new StreamsMetricsImpl(metrics, "test-application", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -984,7 +984,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time); + new StreamsMetricsImpl(metrics, "test-application", time); final Properties props = StreamsTestUtils.getStreamsConfig(); context = mock(InternalMockProcessorContext.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 33b97af3386..47456fc03ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Record; @@ -66,7 +65,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); final Metrics metrics = new Metrics(); offset = 0; - streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new MockTime()); context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 489bb12db2a..9ebac835b43 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -58,7 +57,7 @@ public class TimestampedSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java index 00557ab0c00..113de5959a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -203,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest { private void runAndVerifySumOfProperties(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -220,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest { private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -237,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest { private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index e9f67a2833a..44c7373c45b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals.metrics; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext; @@ -179,7 +178,7 @@ public class RocksDBMetricsRecorderTest { assertThrows( IllegalStateException.class, () -> recorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), TASK_ID1 ) ); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index ff9080a2de1..4ba7e565c6d 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -89,7 +89,7 @@ public class InternalMockProcessorContext<KOut, VOut> this(null, null, null, - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null, @@ -106,7 +106,6 @@ public class InternalMockProcessorContext<KOut, VOut> new StreamsMetricsImpl( new Metrics(), "mock", - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), new MockTime() ), config, @@ -139,7 +138,6 @@ public class InternalMockProcessorContext<KOut, VOut> new StreamsMetricsImpl( new Metrics(), "mock", - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), new MockTime() ), config, @@ -157,7 +155,7 @@ public class InternalMockProcessorContext<KOut, VOut> stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), config, null, null, @@ -177,7 +175,7 @@ public class InternalMockProcessorContext<KOut, VOut> null, serdes.keySerde(), serdes.valueSerde(), - new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(metrics, "mock", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, null, @@ -194,7 +192,7 @@ public class InternalMockProcessorContext<KOut, VOut> stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST, new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, cache, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 9e76fad4264..884fa2c7dda 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -403,7 +403,6 @@ public class TopologyTestDriver implements Closeable { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, "test-client", - streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), mockWallClockTime ); TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), streamsMetrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index a9b47b59da9..8399233a4e4 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -242,7 +242,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S this.metrics = new StreamsMetricsImpl( new Metrics(metricConfig), threadId, - streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java index 7fe262092dd..146359bf25e 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java @@ -255,7 +255,6 @@ public class MockProcessorContext<KForward, VForward> implements ProcessorContex metrics = new StreamsMetricsImpl( new Metrics(metricConfig), threadId, - streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 5b0a858a86e..236ca53f791 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -232,7 +232,6 @@ public class MockProcessorContextTest { when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl( new Metrics(new MetricConfig()), Thread.currentThread().getName(), - "", Time.SYSTEM )); when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1));