ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
         return streamThread;
     }
 
+    private static Metrics getMetrics(final StreamsConfig config, final Time 
time, final String clientId) {
+        final MetricConfig metricConfig = new MetricConfig()
+            .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+            
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
+        final List<MetricsReporter> reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                                                                              
MetricsReporter.class,
+                                                                              
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
+        final JmxReporter jmxReporter = new JmxReporter();
+        jmxReporter.configure(config.originals());
+        reporters.add(jmxReporter);
+        final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
+                                                                      
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+        return new Metrics(metricConfig, reporters, time, metricsContext);
+    }
+
+    private int getNumStreamThreads(final boolean hasGlobalTopology) {
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+            throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+                                            "must subscribe to at least one 
source topic or global table.");
+        }
+        return numStreamThreads;

Review comment:
       Just tried to factor some of the self-contained logic into helper 
methods, since I found it incredibly difficult to get oriented within the 
super-long KafkaStreams constructor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to