This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new aa303e8 HOTFIX: timeout issue in removeStreamThread() (#10321)
aa303e8 is described below
commit aa303e89f0851815428eeb41e6c533975d3e6e69
Author: Walker Carlson <[email protected]>
AuthorDate: Mon Mar 15 20:12:37 2021 -0700
HOTFIX: timeout issue in removeStreamThread() (#10321)
Timeout is a duration not a point in time.
Reviewers: Bruno Cadonna <[email protected]>, Anna Sophie Blee-Goldman
<[email protected]>
---
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f62427d..d8023b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1031,7 +1031,6 @@ public class KafkaStreams implements AutoCloseable {
}
private Optional<String> removeStreamThread(final long timeoutMs) throws
TimeoutException {
- final long begin = time.milliseconds();
boolean timeout = false;
if (isRunningOrRebalancing()) {
synchronized (changeThreadCount) {
@@ -1044,7 +1043,7 @@ public class KafkaStreams implements AutoCloseable {
streamThread.requestLeaveGroupDuringShutdown();
streamThread.shutdown();
if
(!streamThread.getName().equals(Thread.currentThread().getName())) {
- if
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
+ if
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
log.warn("Thread " + streamThread.getName() +
" did not shutdown in the allotted time");
timeout = true;
// Don't remove from threads until shutdown is
complete. We will trim it from the
@@ -1066,7 +1065,7 @@ public class KafkaStreams implements AutoCloseable {
new
RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
try {
-
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs
- begin, TimeUnit.MILLISECONDS);
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs,
TimeUnit.MILLISECONDS);
} catch (final
java.util.concurrent.TimeoutException e) {
log.error("Could not remove static member {}
from consumer group {} due to a timeout: {}",
groupInstanceID.get(),
config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);