This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new aa303e8  HOTFIX: timeout issue in removeStreamThread() (#10321)
aa303e8 is described below

commit aa303e89f0851815428eeb41e6c533975d3e6e69
Author: Walker Carlson <[email protected]>
AuthorDate: Mon Mar 15 20:12:37 2021 -0700

    HOTFIX: timeout issue in removeStreamThread() (#10321)
    
    Timeout is a duration not a point in time.
    
    Reviewers: Bruno Cadonna <[email protected]>, Anna Sophie Blee-Goldman 
<[email protected]>
---
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f62427d..d8023b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1031,7 +1031,6 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     private Optional<String> removeStreamThread(final long timeoutMs) throws 
TimeoutException {
-        final long begin = time.milliseconds();
         boolean timeout = false;
         if (isRunningOrRebalancing()) {
             synchronized (changeThreadCount) {
@@ -1044,7 +1043,7 @@ public class KafkaStreams implements AutoCloseable {
                         streamThread.requestLeaveGroupDuringShutdown();
                         streamThread.shutdown();
                         if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-                            if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
+                            if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
                                 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
                                 timeout = true;
                                 // Don't remove from threads until shutdown is 
complete. We will trim it from the
@@ -1066,7 +1065,7 @@ public class KafkaStreams implements AutoCloseable {
                                     new 
RemoveMembersFromConsumerGroupOptions(membersToRemove)
                                 );
                             try {
-                                
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+                                
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs,
 TimeUnit.MILLISECONDS);
                             } catch (final 
java.util.concurrent.TimeoutException e) {
                                 log.error("Could not remove static member {} 
from consumer group {} due to a timeout: {}",
                                         groupInstanceID.get(), 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);

Reply via email to