lucasbru commented on code in PR #14936:
URL: https://github.com/apache/kafka/pull/14936#discussion_r1420236155


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -231,6 +240,69 @@ private void restoreTasks(final long now) {
             }
         }
 
+        private void maybeGetClientInstanceIds() {
+            if (fetchDeadlineClientInstanceId != -1) {
+                if (!clientInstanceIdFuture.isDone()) {
+                    if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+                        try {
+                            // if the state-updated thread has active work:
+                            //    we pass in a timeout of zero into each 
`clientInstanceId()` call
+                            //    to just trigger the "get instance id" 
background RPC;
+                            //    we don't want to block the state updater 
thread that can do useful work in the meantime
+                            // otherwise, we pass in 100ms to avoid busy 
waiting
+                            clientInstanceIdFuture.complete(
+                                restoreConsumer.clientInstanceId(
+                                    allWorkDone() ? Duration.ofMillis(100L) : 
Duration.ZERO

Review Comment:
   I think this will always be false here, because of 
`fetchDeadlineClientInstanceId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to