guozhangwang commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r569838088
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder
internalTopologyBuilder,
}
final LogContext logContext = new
LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
+
+ // use client id instead of thread client id since this admin client
may be shared among threads
this.clientSupplier = clientSupplier;
- final MetricConfig metricConfig = new MetricConfig()
- .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
- final List<MetricsReporter> reporters =
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG,
clientId));
- final JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals());
- reporters.add(jmxReporter);
- final MetricsContext metricsContext = new
KafkaMetricsContext(JMX_PREFIX,
-
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
- metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+ adminClient =
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
+
+ log.info("Kafka Streams version: {}", ClientMetrics.version());
+ log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
+
+ metrics = getMetrics(config, time, clientId);
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
time
);
+
ClientMetrics.addVersionMetric(streamsMetrics);
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics,
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics,
internalTopologyBuilder.describe().toString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state);
- log.info("Kafka Streams version: {}", ClientMetrics.version());
- log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
- this.internalTopologyBuilder = internalTopologyBuilder;
- // re-write the physical topology according to the config
- internalTopologyBuilder.rewriteTopology(config);
+ ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) ->
Review comment:
Thanks for the code cleanup and re-ordering! BTW if there's no logical
changes at all maybe add a comment on the PR next time.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder
internalTopologyBuilder,
final Time time) throws StreamsException {
this.config = config;
this.time = time;
+
+ this.internalTopologyBuilder = internalTopologyBuilder;
+ internalTopologyBuilder.rewriteTopology(config);
+
+ // sanity check to fail-fast in case we cannot build a
ProcessorTopology due to an exception
+ taskTopology = internalTopologyBuilder.buildTopology();
+ globalTaskTopology =
internalTopologyBuilder.buildGlobalStateTopology();
+
+ final boolean hasGlobalTopology = globalTaskTopology != null;
+ final boolean hasPersistentStores =
taskTopology.hasPersistentLocalStore() ||
+ (hasGlobalTopology &&
globalTaskTopology.hasPersistentGlobalStore());
+
+ try {
+ stateDirectory = new StateDirectory(config, time,
hasPersistentStores);
+ processId = stateDirectory.initializeProcessId();
Review comment:
cc @cadonna for the pre-creation of internal topics, we would also need
to build the topology in order to populate the `RepartitionTopics` etc, let's
just make sure we only build it once when we add that logic.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]