ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long
cacheSizePerThread, fin
return streamThread;
}
+ private static Metrics getMetrics(final StreamsConfig config, final Time
time, final String clientId) {
+ final MetricConfig metricConfig = new MetricConfig()
+ .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
+ final List<MetricsReporter> reporters =
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+
MetricsReporter.class,
+
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
+ final JmxReporter jmxReporter = new JmxReporter();
+ jmxReporter.configure(config.originals());
+ reporters.add(jmxReporter);
+ final MetricsContext metricsContext = new
KafkaMetricsContext(JMX_PREFIX,
+
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ return new Metrics(metricConfig, reporters, time, metricsContext);
+ }
+
+ private int getNumStreamThreads(final boolean hasGlobalTopology) {
+ final int numStreamThreads;
+ if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+ log.info("Overriding number of StreamThreads to zero for
global-only topology");
+ numStreamThreads = 0;
+ } else {
+ numStreamThreads =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ }
+
+ if (numStreamThreads == 0 && !hasGlobalTopology) {
+ log.error("Topology with no input topics will create no stream
threads and no global thread.");
+ throw new TopologyException("Topology has no stream threads and no
global threads, " +
+ "must subscribe to at least one
source topic or global table.");
+ }
+ return numStreamThreads;
Review comment:
Just tried to factor some of the self-contained logic into helper
methods, since I found it incredibly difficult to get oriented within the
super-long KafkaStreams constructor
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]