mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1419280528
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -713,6 +723,44 @@ boolean runLoop() {
return true;
}
+ // visible for testing
+ void maybeGetClientInstanceIds() {
+ // we pass in a timeout of zero into each `clientInstanceId()` call
+ // to just trigger the "get instance id" background RPC;
+ // we don't want to block the stream thread that can do useful work in
the meantime
+
+ if (fetchDeadlineClientInstanceId != -1) {
+ if (!mainConsumerInstanceIdFuture.isDone()) {
+ if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+ try {
+
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+ maybeResetFetchDeadline();
+ } catch (final IllegalStateException disabledError) {
+ mainConsumerInstanceIdFuture.complete(null);
Review Comment:
ISE is throws if telemetry is disabled on the client -- in general, we
assume that it's enabled for all clients, or disabled for all clients, but it
could also be, that it's disabled for _some_ client only. For the last case, we
want to swallow the error what happens here (on general, we say, a single error
on any client fails the call to `KafkaStreams#clientInstanceIds()` but for this
case we need to make an exception).
Let me know if you agree or disagree. Will add a comment. (Seems
`disabledError` as var name is not descriptive enough).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]