This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new b9def1e KAFKA-12375: don't reuse thread.id until a thread has fully
shut down (#10215)
b9def1e is described below
commit b9def1edbfa1232e5a497373e7913bfb11b5f263
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Mar 2 16:28:15 2021 -0800
KAFKA-12375: don't reuse thread.id until a thread has fully shut down
(#10215)
Always grab a new thread.id and verify that a thread has fully shut down to
DEAD before removing from the `threads` list and making that id available again
Reviewers: Walker Carlson <[email protected]>, Bruno Cadonna
<[email protected]>
---
checkstyle/checkstyle.xml | 1 +
.../org/apache/kafka/streams/KafkaStreams.java | 69 +++++++++++++++++-----
.../org/apache/kafka/streams/KafkaStreamsTest.java | 14 ++---
3 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 91045ad..7f912dc 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -120,6 +120,7 @@
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
<property name="max" value="25"/>
+ <property name="excludeClassesRegexps" value="AtomicInteger"/>
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index de048d1..f62427d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -92,6 +92,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -463,9 +464,8 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
- threads.remove(deadThread);
- addStreamThread();
deadThread.shutdown();
+ addStreamThread();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable instanceof Error) {
@@ -970,7 +970,7 @@ public class KafkaStreams implements AutoCloseable {
final StreamThread streamThread;
synchronized (changeThreadCount) {
threadIdx = getNextThreadIndex();
- cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+ cacheSizePerThread =
getCacheSizePerThread(getNumLiveStreamThreads() + 1);
resizeThreadCache(cacheSizePerThread);
// Creating thread should hold the lock in order to avoid
duplicate thread index.
// If the duplicate index happen, the metadata of thread may
be duplicate too.
@@ -984,7 +984,7 @@ public class KafkaStreams implements AutoCloseable {
} else {
streamThread.shutdown();
threads.remove(streamThread);
- resizeThreadCache(getCacheSizePerThread(threads.size()));
+
resizeThreadCache(getCacheSizePerThread(getNumLiveStreamThreads()));
}
}
}
@@ -1038,7 +1038,7 @@ public class KafkaStreams implements AutoCloseable {
// make a copy of threads to avoid holding lock
for (final StreamThread streamThread : new
ArrayList<>(threads)) {
final boolean callingThreadIsNotCurrentStreamThread =
!streamThread.getName().equals(Thread.currentThread().getName());
- if (streamThread.isAlive() &&
(callingThreadIsNotCurrentStreamThread || threads.size() == 1)) {
+ if (streamThread.isAlive() &&
(callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads() == 1)) {
log.info("Removing StreamThread " +
streamThread.getName());
final Optional<String> groupInstanceID =
streamThread.getGroupInstanceID();
streamThread.requestLeaveGroupDuringShutdown();
@@ -1047,10 +1047,15 @@ public class KafkaStreams implements AutoCloseable {
if
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
log.warn("Thread " + streamThread.getName() +
" did not shutdown in the allotted time");
timeout = true;
+ // Don't remove from threads until shutdown is
complete. We will trim it from the
+ // list once it reaches DEAD, and if for some
reason it's hanging indefinitely in the
+ // shutdown then we should just consider this
thread.id to be burned
+ } else {
+ threads.remove(streamThread);
}
}
- threads.remove(streamThread);
- final long cacheSizePerThread =
getCacheSizePerThread(threads.size());
+
+ final long cacheSizePerThread =
getCacheSizePerThread(getNumLiveStreamThreads());
resizeThreadCache(cacheSizePerThread);
if (groupInstanceID.isPresent() &&
callingThreadIsNotCurrentStreamThread) {
final MemberToRemove memberToRemove = new
MemberToRemove(groupInstanceID.get());
@@ -1093,17 +1098,51 @@ public class KafkaStreams implements AutoCloseable {
return Optional.empty();
}
+ // Returns the number of threads that are not in the DEAD state -- use
this over threads.size()
+ private int getNumLiveStreamThreads() {
+ final AtomicInteger numLiveThreads = new AtomicInteger(0);
+ synchronized (threads) {
+ processStreamThread(thread -> {
+ if (thread.state() == StreamThread.State.DEAD) {
+ threads.remove(thread);
+ } else {
+ numLiveThreads.incrementAndGet();
+ }
+ });
+ return numLiveThreads.get();
+ }
+ }
+
private int getNextThreadIndex() {
- final HashSet<String> names = new HashSet<>();
- processStreamThread(thread -> names.add(thread.getName()));
- final String baseName = clientId + "-StreamThread-";
- for (int i = 1; i <= threads.size(); i++) {
- final String name = baseName + i;
- if (!names.contains(name)) {
- return i;
+ final HashSet<String> allLiveThreadNames = new HashSet<>();
+ final AtomicInteger maxThreadId = new AtomicInteger(1);
+ synchronized (threads) {
+ processStreamThread(thread -> {
+ // trim any DEAD threads from the list so we can reuse the
thread.id
+ // this is only safe to do once the thread has fully completed
shutdown
+ if (thread.state() == StreamThread.State.DEAD) {
+ threads.remove(thread);
+ } else {
+ allLiveThreadNames.add(thread.getName());
+ // Assume threads are always named with the
"-StreamThread-<threadId>" suffix
+ final int threadId =
Integer.parseInt(thread.getName().substring(thread.getName().lastIndexOf("-") +
1));
+ if (threadId > maxThreadId.get()) {
+ maxThreadId.set(threadId);
+ }
+ }
+ });
+
+ final String baseName = clientId + "-StreamThread-";
+ for (int i = 1; i <= maxThreadId.get(); i++) {
+ final String name = baseName + i;
+ if (!allLiveThreadNames.contains(name)) {
+ return i;
+ }
}
+ // It's safe to use threads.size() rather than
getNumLiveStreamThreads() to infer the number of threads
+ // here since we trimmed any DEAD threads earlier in this method
while holding the lock
+ return threads.size() + 1;
}
- return threads.size() + 1;
}
private long getCacheSizePerThread(final int numStreamThreads) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a4cd8bf..b3dd559 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -232,8 +232,8 @@ public class KafkaStreamsTest {
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes();
- prepareStreamThread(streamThreadOne, true);
- prepareStreamThread(streamThreadTwo, false);
+ prepareStreamThread(streamThreadOne, 1, true);
+ prepareStreamThread(streamThreadTwo, 2, false);
// setup global threads
final AtomicReference<GlobalStreamThread.State> globalThreadState =
new AtomicReference<>(GlobalStreamThread.State.CREATED);
@@ -293,7 +293,7 @@ public class KafkaStreamsTest {
);
}
- private void prepareStreamThread(final StreamThread thread, final boolean
terminable) throws Exception {
+ private void prepareStreamThread(final StreamThread thread, final int
threadId, final boolean terminable) throws Exception {
final AtomicReference<StreamThread.State> state = new
AtomicReference<>(StreamThread.State.CREATED);
EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
@@ -321,7 +321,7 @@ public class KafkaStreamsTest {
}).anyTimes();
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata(
- "newThead",
+ "processId-StreamThread-" + threadId,
"DEAD",
"",
"",
@@ -337,7 +337,7 @@ public class KafkaStreamsTest {
EasyMock.expectLastCall().anyTimes();
thread.requestLeaveGroupDuringShutdown();
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(thread.getName()).andStubReturn("newThread");
+
EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" +
threadId);
thread.shutdown();
EasyMock.expectLastCall().andAnswer(() -> {
supplier.consumer.close();
@@ -564,7 +564,7 @@ public class KafkaStreamsTest {
streams.start();
final int oldSize = streams.threads.size();
TestUtils.waitForCondition(() -> streams.state() ==
KafkaStreams.State.RUNNING, 15L, "wait until running");
- assertThat(streams.addStreamThread(),
equalTo(Optional.of("newThread")));
+ assertThat(streams.addStreamThread(),
equalTo(Optional.of("processId-StreamThread-" + 2)));
assertThat(streams.threads.size(), equalTo(oldSize + 1));
}
@@ -613,7 +613,7 @@ public class KafkaStreamsTest {
final int oldSize = streams.threads.size();
TestUtils.waitForCondition(() -> streams.state() ==
KafkaStreams.State.RUNNING, 15L,
"Kafka Streams client did not reach state RUNNING");
- assertThat(streams.removeStreamThread(),
equalTo(Optional.of("newThread")));
+ assertThat(streams.removeStreamThread(),
equalTo(Optional.of("processId-StreamThread-" + 1)));
assertThat(streams.threads.size(), equalTo(oldSize - 1));
}