ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r569888405



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         }
         final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
+
+        // use client id instead of thread client id since this admin client 
may be shared among threads
         this.clientSupplier = clientSupplier;
-        final MetricConfig metricConfig = new MetricConfig()
-            .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-            
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-            
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
-        final List<MetricsReporter> reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class,
-                Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, 
clientId));
-        final JmxReporter jmxReporter = new JmxReporter();
-        jmxReporter.configure(config.originals());
-        reporters.add(jmxReporter);
-        final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
-                
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
-        metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+        adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
+
+        log.info("Kafka Streams version: {}", ClientMetrics.version());
+        log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
+
+        metrics = getMetrics(config, time, clientId);
         streamsMetrics = new StreamsMetricsImpl(
             metrics,
             clientId,
             config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             time
         );
+
         ClientMetrics.addVersionMetric(streamsMetrics);
         ClientMetrics.addCommitIdMetric(streamsMetrics);
         ClientMetrics.addApplicationIdMetric(streamsMetrics, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
internalTopologyBuilder.describe().toString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
-        log.info("Kafka Streams version: {}", ClientMetrics.version());
-        log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
-        this.internalTopologyBuilder = internalTopologyBuilder;
-        // re-write the physical topology according to the config
-        internalTopologyBuilder.rewriteTopology(config);
+        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->

Review comment:
       Ah sorry, I did but it got covered up when I pushed some changes: 
https://github.com/apache/kafka/pull/9978#discussion_r565746851




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to