guozhangwang commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r569838088
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId)); this.log = logContext.logger(getClass()); + + // use client id instead of thread client id since this admin client may be shared among threads this.clientSupplier = clientSupplier; - final MetricConfig metricConfig = new MetricConfig() - .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) - .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); - final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class, - Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); - final JmxReporter jmxReporter = new JmxReporter(); - jmxReporter.configure(config.originals()); - reporters.add(jmxReporter); - final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, - config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); - metrics = new Metrics(metricConfig, reporters, time, metricsContext); + adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); + + log.info("Kafka Streams version: {}", ClientMetrics.version()); + log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); + + metrics = getMetrics(config, time, clientId); streamsMetrics = new StreamsMetricsImpl( metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time ); + ClientMetrics.addVersionMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics); ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); - log.info("Kafka Streams version: {}", ClientMetrics.version()); - log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); - this.internalTopologyBuilder = internalTopologyBuilder; - // re-write the physical topology according to the config - internalTopologyBuilder.rewriteTopology(config); + ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> Review comment: Thanks for the code cleanup and re-ordering! BTW if there's no logical changes at all maybe add a comment on the PR next time. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + + this.internalTopologyBuilder = internalTopologyBuilder; + internalTopologyBuilder.rewriteTopology(config); + + // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception + taskTopology = internalTopologyBuilder.buildTopology(); + globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + + final boolean hasGlobalTopology = globalTaskTopology != null; + final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || + (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + + try { + stateDirectory = new StateDirectory(config, time, hasPersistentStores); + processId = stateDirectory.initializeProcessId(); Review comment: cc @cadonna for the pre-creation of internal topics, we would also need to build the topology in order to populate the `RepartitionTopics` etc, let's just make sure we only build it once when we add that logic. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org