mjsax commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1419280528


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -713,6 +723,44 @@ boolean runLoop() {
         return true;
     }
 
+    // visible for testing
+    void maybeGetClientInstanceIds() {
+        // we pass in a timeout of zero into each `clientInstanceId()` call
+        // to just trigger the "get instance id" background RPC;
+        // we don't want to block the stream thread that can do useful work in 
the meantime
+
+        if (fetchDeadlineClientInstanceId != -1) {
+            if (!mainConsumerInstanceIdFuture.isDone()) {
+                if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+                    try {
+                        
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
+                        maybeResetFetchDeadline();
+                    } catch (final IllegalStateException disabledError) {
+                        mainConsumerInstanceIdFuture.complete(null);

Review Comment:
   ISE is throws if telemetry is disabled on the client -- in general, we 
assume that it's enabled for all clients, or disabled for all clients, but it 
could also be, that it's disabled for _some_ client only. For the last case, we 
want to swallow the error what happens here (on general, we say, a single error 
on any client fails the call to `KafkaStreams#clientInstanceIds()` but for this 
case we need to make an exception).
   
   Let me know if you agree or disagree. Will add a comment. (Seems 
`disabledError` as var name is not descriptive enough).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to