guozhangwang commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r569838088



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         }
         final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
+
+        // use client id instead of thread client id since this admin client 
may be shared among threads
         this.clientSupplier = clientSupplier;
-        final MetricConfig metricConfig = new MetricConfig()
-            .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-            
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-            
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
-        final List<MetricsReporter> reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class,
-                Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, 
clientId));
-        final JmxReporter jmxReporter = new JmxReporter();
-        jmxReporter.configure(config.originals());
-        reporters.add(jmxReporter);
-        final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
-                
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
-        metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+        adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
+
+        log.info("Kafka Streams version: {}", ClientMetrics.version());
+        log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
+
+        metrics = getMetrics(config, time, clientId);
         streamsMetrics = new StreamsMetricsImpl(
             metrics,
             clientId,
             config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             time
         );
+
         ClientMetrics.addVersionMetric(streamsMetrics);
         ClientMetrics.addCommitIdMetric(streamsMetrics);
         ClientMetrics.addApplicationIdMetric(streamsMetrics, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
internalTopologyBuilder.describe().toString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
-        log.info("Kafka Streams version: {}", ClientMetrics.version());
-        log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
-        this.internalTopologyBuilder = internalTopologyBuilder;
-        // re-write the physical topology according to the config
-        internalTopologyBuilder.rewriteTopology(config);
+        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->

Review comment:
       Thanks for the code cleanup and re-ordering! BTW if there's no logical 
changes at all maybe add a comment on the PR next time.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
                          final Time time) throws StreamsException {
         this.config = config;
         this.time = time;
+
+        this.internalTopologyBuilder = internalTopologyBuilder;
+        internalTopologyBuilder.rewriteTopology(config);
+
+        // sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+        taskTopology = internalTopologyBuilder.buildTopology();
+        globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+        final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+            (hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+        try {
+            stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+            processId = stateDirectory.initializeProcessId();

Review comment:
       cc @cadonna for the pre-creation of internal topics, we would also need 
to build the topology in order to populate the `RepartitionTopics` etc, let's 
just make sure we only build it once when we add that logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to