ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r569888405
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder
internalTopologyBuilder,
}
final LogContext logContext = new
LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
+
+ // use client id instead of thread client id since this admin client
may be shared among threads
this.clientSupplier = clientSupplier;
- final MetricConfig metricConfig = new MetricConfig()
- .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
- final List<MetricsReporter> reporters =
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG,
clientId));
- final JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals());
- reporters.add(jmxReporter);
- final MetricsContext metricsContext = new
KafkaMetricsContext(JMX_PREFIX,
-
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
- metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+ adminClient =
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
+
+ log.info("Kafka Streams version: {}", ClientMetrics.version());
+ log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
+
+ metrics = getMetrics(config, time, clientId);
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
time
);
+
ClientMetrics.addVersionMetric(streamsMetrics);
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics,
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics,
internalTopologyBuilder.describe().toString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state);
- log.info("Kafka Streams version: {}", ClientMetrics.version());
- log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
- this.internalTopologyBuilder = internalTopologyBuilder;
- // re-write the physical topology according to the config
- internalTopologyBuilder.rewriteTopology(config);
+ ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) ->
Review comment:
Ah sorry, I did but it got covered up when I pushed some changes:
https://github.com/apache/kafka/pull/9978#discussion_r565746851
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]