[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-28 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r566190848



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -983,19 +988,68 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws org.apache.kafka.common.errors.TimeoutException if the thread 
does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
+boolean timeout = false;
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())

Review comment:
   sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-28 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r566188299



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1158,6 +1176,14 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public Optional getGroupInstanceID() {
+return getGroupInstanceID;
+}
+
+public void leaveGroup() {
+this.leaveGroup.set(true);

Review comment:
   Either way we will need to set a flag and let thread take care of it so 
I will just rename it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-28 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r566187516



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1074,6 +1089,9 @@ private void completeShutdown(final boolean cleanRun) {
 streamsMetrics.removeAllThreadLevelSensors(getName());
 
 setState(State.DEAD);
+if (leaveGroup.get()) {
+mainConsumer.unsubscribe();

Review comment:
   good to know





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565823251



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   Okay I buy it I'll delay the exception





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565820783



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }

Review comment:
   yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565802847



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   But it isn't consistent because if the thread removes itself then the 
timeout its started





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565785613



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   I don't either...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   H. That is interesting. I am not sure. If the thread hasn't been 
removed then we don't want to resize the cache. The timeout is essentially 
saying that removing the thread failed. So is it right to then remove it 
anyways? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   H. That is interesting. I am not sure. If the thread hasn't been 
removed then we don't want to resize the cache so would removing the thread 
then throwing an exception the right way of doing it as the timeout is 
essentially  saying that removing the thread failed. So is it right to then 
remove it anyways?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565734101



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+} catch (final 
java.util.concurrent.TimeoutException e) {

Review comment:
   We should. And I think maybe we should log the original stack trace





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565732778



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (groupInstanceID.isPresent()) {

Review comment:
   Ok, so we just do something like if (groupInstanceID.isPresent() && 
!streamThread.getName().equals(Thread.currentThread().getName())  when deciding 
whether to remove it from the group?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565640687



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+} catch (final 
java.util.concurrent.TimeoutException e) {

Review comment:
   have to make this a kafkaTimeout





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565635667



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));

Review comment:
   Sounds good. How should you handle the `ExecutionException`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565628682



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   that works





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565628241



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);

Review comment:
   good idea, I reworked that a bit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565622524



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {
+timeoutMs = 0;
+} else if (timeoutMs == 0) {
+timeoutMs = Long.MAX_VALUE;

Review comment:
   ah yeah, I had to fix this when I was writing my test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565622163



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -91,6 +93,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;

Review comment:
   I did not know that. good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565621384



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID() {

Review comment:
   Either way we need to deal with it. I thought it would be easier to just 
do it once. But It probably better practice to handle it later. I will change 
it to Optional





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565607533



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {

Review comment:
   for the non timeout uses

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID(){
+return mainConsumer.groupMetadata().groupInstanceId().orElse("");

Review comment:
   It seems easier to get it form here than the config. It looked like I 
might have how to manipulate strings in that case 

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1007,56 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   I ended up getting the `group.instance.id` from the streamThread





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org