ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin return streamThread; } + private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { + final MetricConfig metricConfig = new MetricConfig() + .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); + final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); + final JmxReporter jmxReporter = new JmxReporter(); + jmxReporter.configure(config.originals()); + reporters.add(jmxReporter); + final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + return new Metrics(metricConfig, reporters, time, metricsContext); + } + + private int getNumStreamThreads(final boolean hasGlobalTopology) { + final int numStreamThreads; + if (internalTopologyBuilder.hasNoNonGlobalTopology()) { + log.info("Overriding number of StreamThreads to zero for global-only topology"); + numStreamThreads = 0; + } else { + numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + } + + if (numStreamThreads == 0 && !hasGlobalTopology) { + log.error("Topology with no input topics will create no stream threads and no global thread."); + throw new TopologyException("Topology has no stream threads and no global threads, " + + "must subscribe to at least one source topic or global table."); + } + return numStreamThreads; Review comment: Just tried to factor some of the self-contained logic into helper methods, since I found it incredibly difficult to get oriented within the super-long KafkaStreams constructor ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org