lucasbru commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1410975879


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,74 @@ protected int processStreamThread(final 
Consumer<StreamThread> consumer) {
         return copy.size();
     }
 
+    /**
+     * Returns the internal clients' assigned {@code client instance ids}.
+     *
+     * @return the internal clients' assigned instance ids used for metrics 
collection.
+     *
+     * @throws IllegalStateException If {@code KafkaStreams} is not running.
+     * @throws TimeoutException Indicates that a request timed out.
+     */
+    public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+        if (state().hasNotStarted()) {
+            throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+        }
+
+        final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+        final Map<String, KafkaFuture<Uuid>> streamThreadFutures = new 
HashMap<>();
+        for (final StreamThread streamThread : threads) {
+            
streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout));
+        }
+
+        KafkaFuture<Uuid> globalThreadFuture = null;
+        if (globalStreamThread != null) {
+            globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+        }
+
+        try {
+            
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+        } catch (final TimeoutException timeoutException) {
+            log.warn("Could not get admin client-instance-id due to timeout.");
+        }
+
+        for (final Map.Entry<String, KafkaFuture<Uuid>> streamThreadFuture : 
streamThreadFutures.entrySet()) {
+            try {
+                clientInstanceIds.addConsumerInstanceId(
+                    streamThreadFuture.getKey(),
+                    streamThreadFuture.getValue().get()
+                );
+            } catch (final ExecutionException exception) {
+                if (exception.getCause() instanceof TimeoutException) {
+                    log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+                } else {
+                    log.error("Could not get global consumer 
client-instance-id", exception);
+                }
+            } catch (final InterruptedException error) {
+                log.error("Could not get global consumer client-instance-id", 
error);
+            }
+        }
+
+        if (globalThreadFuture != null) {
+            try {
+                
clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), 
globalThreadFuture.get());
+            } catch (final ExecutionException exception) {
+                if (exception.getCause() instanceof TimeoutException) {
+                    log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+                } else {
+                    log.error("Could not get global consumer 
client-instance-id", exception);
+                }
+            } catch (final InterruptedException error) {
+                log.error("Could not get global consumer client-instance-id", 
error);
+            }
+        }
+
+        return clientInstanceIds;

Review Comment:
   You could consider using `KafkaFuture.allOf(...).get(duration)` for the 
futures of all clients, so apply the same timeout to all futures. It will throw 
on the first exception that is returned. It will return void, but you can 
inspect the original futures for the results, if `get` doesn't throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to