lucasbru commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1410975879
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1791,6 +1794,74 @@ protected int processStreamThread(final Consumer<StreamThread> consumer) { return copy.size(); } + /** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ + public ClientInstanceIds clientInstanceIds(final Duration timeout) { + if (state().hasNotStarted()) { + throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); + } + if (state().isShuttingDown() || state.hasCompletedShutdown()) { + throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); + } + + final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + + final Map<String, KafkaFuture<Uuid>> streamThreadFutures = new HashMap<>(); + for (final StreamThread streamThread : threads) { + streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout)); + } + + KafkaFuture<Uuid> globalThreadFuture = null; + if (globalStreamThread != null) { + globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); + } + + try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); + } catch (final TimeoutException timeoutException) { + log.warn("Could not get admin client-instance-id due to timeout."); + } + + for (final Map.Entry<String, KafkaFuture<Uuid>> streamThreadFuture : streamThreadFutures.entrySet()) { + try { + clientInstanceIds.addConsumerInstanceId( + streamThreadFuture.getKey(), + streamThreadFuture.getValue().get() + ); + } catch (final ExecutionException exception) { + if (exception.getCause() instanceof TimeoutException) { + log.warn("Could not get global consumer client-instance-id due to timeout."); + } else { + log.error("Could not get global consumer client-instance-id", exception); + } + } catch (final InterruptedException error) { + log.error("Could not get global consumer client-instance-id", error); + } + } + + if (globalThreadFuture != null) { + try { + clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), globalThreadFuture.get()); + } catch (final ExecutionException exception) { + if (exception.getCause() instanceof TimeoutException) { + log.warn("Could not get global consumer client-instance-id due to timeout."); + } else { + log.error("Could not get global consumer client-instance-id", exception); + } + } catch (final InterruptedException error) { + log.error("Could not get global consumer client-instance-id", error); + } + } + + return clientInstanceIds; Review Comment: You could consider using `KafkaFuture.allOf(...).get(duration)` for the futures of all clients, so apply the same timeout to all futures. It will throw on the first exception that is returned. It will return void, but you can inspect the original futures for the results, if `get` doesn't throw. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org