vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499019285
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final
Thread.UncaughtExceptionHandler eh
}
}
+ /**
+ * Set the handler invoked when a {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is
not in state {@link State#CREATED CREATED}.
+ */
+ public void setUncaughtExceptionHandler(final
StreamsUncaughtExceptionHandler eh) {
+ final StreamsUncaughtExceptionHandler handler = exception ->
handleStreamsUncaughtException(exception, eh);
+ synchronized (stateLock) {
+ if (state == State.CREATED) {
+ for (final StreamThread thread : threads) {
+ if (eh != null) {
+ thread.setStreamsUncaughtExceptionHandler(handler);
+ } else {
+ final StreamsUncaughtExceptionHandler defaultHandler =
exception ->
+
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+ }
+ }
+ } else {
+ throw new IllegalStateException("Can only set
UncaughtExceptionHandler in CREATED state. " +
+ "Current state is: " + state);
+ }
+ }
+ }
+
+ private
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handleStreamsUncaughtException(final Exception e,
+
final StreamsUncaughtExceptionHandler
streamsUncaughtExceptionHandler) {
+ final
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action
= streamsUncaughtExceptionHandler.handle(e);
+ switch (action) {
+ case SHUTDOWN_STREAM_THREAD:
+ log.error("Encountered the following exception during
processing " +
+ "and the thread is going to shut down: ", e);
+ break;
+ case REPLACE_STREAM_THREAD:
+ log.error("Encountered the following exception during
processing " +
+ "and the the stream thread will be replaced: ", e);
//TODO: add then remove, wait until 663 is merged
+ break;
+ case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+ log.error("Encountered the following exception during
processing " +
+ "and the client is going to shut down: ", e);
+ for (final StreamThread streamThread: threads) {
+ streamThread.shutdown();
+ }
Review comment:
It seems safer to just call the nonblocking close method:
```suggestion
close(Duration.ZERO);
```
That way, it'll properly set the state, stop the cleaner thread, etc.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -436,6 +496,8 @@ private void maybeSetError() {
}
if (setState(State.ERROR)) {
+ metrics.close();
Review comment:
Actually, now that I have a more in-depth picture of what is going on, I
disagree. I think we should leave these threads running until users actually
call `KafkaStreams.close`, since the could alternatively add more threads (via
KIP-633) to transition back from ERROR state into RUNNING, at which point,
we'll be sorry that we killed these threads, right?
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -346,6 +347,9 @@ public void setStateListener(final
KafkaStreams.StateListener listener) {
*
* @param eh the uncaught exception handler for all internal threads;
{@code null} deletes the current handler
* @throws IllegalStateException if this {@code KafkaStreams} instance is
not in state {@link State#CREATED CREATED}.
+ *
+ * @Deprecated user {@link
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)}
instead.
Review comment:
```suggestion
* @Deprecated Since 2.7.0. Use {@link
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)}
instead.
```
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final
Thread.UncaughtExceptionHandler eh
}
}
+ /**
+ * Set the handler invoked when a {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is
not in state {@link State#CREATED CREATED}.
+ */
+ public void setUncaughtExceptionHandler(final
StreamsUncaughtExceptionHandler eh) {
Review comment:
```suggestion
public void setUncaughtExceptionHandler(final
StreamsUncaughtExceptionHandler uncaughtExceptionHandler) {
```
We prefer to resist the urge to abbreviate, especially in the public-facing
APIs.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+ /**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+ StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handle(final Exception exception);
Review comment:
Let's tweak this API to Throwable:
```suggestion
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handle(final Throwable exception);
```
Here's a good explanation of why:
https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch
The benefit is that we could handle `Error`s as well as `Exception`s.
However, this comes with the obligation that we should not continue to use the
thread after an Error occurs. I think we can deal with this restriction
reasonably.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+ /**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+ StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handle(final Exception exception);
+
+ /**
+ * Enumeration that describes the response from the exception handler.
+ */
+ enum StreamsUncaughtExceptionHandlerResponse {
+
+
+ SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+ REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),
+ SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
+ SHUTDOWN_KAFKA_STREAMS_APPLICATION(3,
"SHUTDOWN_KAFKA_STREAMS_APPLICATION");
+
+
+ /** an english description of the api--this is for debugging and can
change */
+ public final String name;
+
+ /** the permanent and immutable id of an API--this can't change ever */
Review comment:
Thanks for clarifying this.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final
Thread.UncaughtExceptionHandler eh
}
}
+ /**
+ * Set the handler invoked when a {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
Review comment:
```suggestion
* Set the handler invoked when a {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
* throws an unexpected exception. These might be exceptions indicating
rare bugs in Kafka Streams, or they
* might be exceptions thrown by your code, for example a
NullPointerException thrown from your processor
* logic.
* <p>
* Note, this handler must be threadsafe, since it will be shared among
all threads, and invoked from any
* thread that encounters such an exception.
```
I think it's wrong to say that this is invoked when the thread abruptly
terminates, because it's not. That's how the JVM handler works, but we're
actually executing this handler while the thread is still running, and in fact
that thread itself is what calls the handler.
It also seemed appropriate to elaborate a little more on the usage of this
method.
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -616,7 +616,7 @@ public void
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
- streams.setUncaughtExceptionHandler(null);
+
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);
Review comment:
Should we also test the behavior of setting the new handler to `null`?
And actually, why is `null` allowed now, but it wasn't before?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -54,6 +54,8 @@ public void onPartitionsAssigned(final
Collection<TopicPartition> partitions) {
if (assignmentErrorCode.get() ==
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {}", assignmentErrorCode.get());
throw new MissingSourceTopicException("One or more source topics
were missing during rebalance");
+ } else if (assignmentErrorCode.get() ==
AssignorError.SHUTDOWN_REQUESTED.code()) {
+ streamThread.shutdown(); //TODO: 663 should set client to error if
all streams are dead
Review comment:
didn't quite follow this TODO
##########
File path:
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+ /**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+ StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handle(final Exception exception);
+
+ /**
+ * Enumeration that describes the response from the exception handler.
+ */
+ enum StreamsUncaughtExceptionHandlerResponse {
+
+
+ SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+ REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),
Review comment:
+1 I think we should at least comment it out until that other PR is
actually in the codebase.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -82,11 +83,21 @@ public SubscriptionInfo(final int version,
final UUID processId,
final String userEndPoint,
final Map<TaskId, Long> taskOffsetSums) {
+ this(version, latestSupportedVersion, processId, userEndPoint,
taskOffsetSums, new AtomicInteger(0));
+ }
+
+ public SubscriptionInfo(final int version,
+ final int latestSupportedVersion,
+ final UUID processId,
+ final String userEndPoint,
+ final Map<TaskId, Long> taskOffsetSums,
+ final AtomicInteger shutdownRequested) {
Review comment:
Cool, so maybe this field should be called something else, like
"subscription flag"?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+@Category(IntegrationTest.class)
+public class AppShutdownIntegrationTest {
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Test
+ public void shouldSendShutDownSignal() throws Exception {
+ //
+ //
+ // Also note that this is an integration test because so many
components have to come together to
+ // ensure these configurations wind up where they belong, and any
number of future code changes
+ // could break this change.
+
+ final String testId = safeUniqueTestName(getClass(), testName);
+ final String appId = "appId_" + testId;
+ final String inputTopic = "input" + testId;
+
+ IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final List<KeyValue<Object, Object>> processorValueCollector = new
ArrayList<>();
+
+ builder.stream(inputTopic).process(() -> new
ShutdownProcessor(processorValueCollector), Named.as("process"));
+
+ final Properties properties = mkObjectProperties(
+ mkMap(
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath()),
+ mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
"5"),
+ mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
"6"),
+ mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000")
+ )
+ );
+
+ try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ kafkaStreams.setUncaughtExceptionHandler(exception ->
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_KAFKA_STREAMS_APPLICATION);
Review comment:
Feel free to import `SHUTDOWN_KAFKA_STREAMS_APPLICATION` and save like
50 characters on this line ;)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -529,8 +541,7 @@ public void run() {
}
}
- log.error("Encountered the following exception during processing "
+
- "and the thread is going to shut down: ", e);
+ handleStreamsUncaughtException(e);
throw e;
Review comment:
I'm not sure this is right, actually. If we wanted to use the current
thread to send the "poison pill" subscription, it needs to keep running and
call poll again.
Maybe instead we should have a default implementation of the
uncaughtExceptionHandler that invokes the legacy one and then returns
`SHUTDOWN_STREAM_THREAD`, and then the implementation of that case in
KafkaStreams would be to re-throw the exception.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final
Thread.UncaughtExceptionHandler eh
}
}
+ /**
+ * Set the handler invoked when a {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is
not in state {@link State#CREATED CREATED}.
+ */
+ public void setUncaughtExceptionHandler(final
StreamsUncaughtExceptionHandler eh) {
+ final StreamsUncaughtExceptionHandler handler = exception ->
handleStreamsUncaughtException(exception, eh);
+ synchronized (stateLock) {
+ if (state == State.CREATED) {
+ for (final StreamThread thread : threads) {
+ if (eh != null) {
+ thread.setStreamsUncaughtExceptionHandler(handler);
+ } else {
+ final StreamsUncaughtExceptionHandler defaultHandler =
exception ->
+
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+ }
+ }
+ } else {
+ throw new IllegalStateException("Can only set
UncaughtExceptionHandler in CREATED state. " +
+ "Current state is: " + state);
+ }
+ }
+ }
+
+ private
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handleStreamsUncaughtException(final Exception e,
+
final StreamsUncaughtExceptionHandler
streamsUncaughtExceptionHandler) {
+ final
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action
= streamsUncaughtExceptionHandler.handle(e);
+ switch (action) {
+ case SHUTDOWN_STREAM_THREAD:
+ log.error("Encountered the following exception during
processing " +
+ "and the thread is going to shut down: ", e);
+ break;
+ case REPLACE_STREAM_THREAD:
+ log.error("Encountered the following exception during
processing " +
+ "and the the stream thread will be replaced: ", e);
//TODO: add then remove, wait until 663 is merged
Review comment:
Let's just comment out this whole enum value until 663 is implemented,
that way we won't have a TODO hanging around in case 663 doesn't make the
release for some reason.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final
Thread.UncaughtExceptionHandler eh
}
}
+ /**
+ * Set the handler invoked when a {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is
not in state {@link State#CREATED CREATED}.
+ */
+ public void setUncaughtExceptionHandler(final
StreamsUncaughtExceptionHandler eh) {
+ final StreamsUncaughtExceptionHandler handler = exception ->
handleStreamsUncaughtException(exception, eh);
+ synchronized (stateLock) {
+ if (state == State.CREATED) {
+ for (final StreamThread thread : threads) {
+ if (eh != null) {
+ thread.setStreamsUncaughtExceptionHandler(handler);
+ } else {
+ final StreamsUncaughtExceptionHandler defaultHandler =
exception ->
+
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+ }
+ }
+ } else {
+ throw new IllegalStateException("Can only set
UncaughtExceptionHandler in CREATED state. " +
+ "Current state is: " + state);
+ }
+ }
+ }
+
+ private
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse
handleStreamsUncaughtException(final Exception e,
+
final StreamsUncaughtExceptionHandler
streamsUncaughtExceptionHandler) {
+ final
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action
= streamsUncaughtExceptionHandler.handle(e);
+ switch (action) {
+ case SHUTDOWN_STREAM_THREAD:
+ log.error("Encountered the following exception during
processing " +
+ "and the thread is going to shut down: ", e);
+ break;
+ case REPLACE_STREAM_THREAD:
+ log.error("Encountered the following exception during
processing " +
+ "and the the stream thread will be replaced: ", e);
//TODO: add then remove, wait until 663 is merged
+ break;
+ case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+ log.error("Encountered the following exception during
processing " +
+ "and the client is going to shut down: ", e);
+ for (final StreamThread streamThread: threads) {
+ streamThread.shutdown();
+ }
Review comment:
Oh, I forgot; the reason you're doing it this way is to transition to
ERROR, not actually shut down, right?
In that case, it seems pretty odd to call this option "shut down", since it
doesn't actually _shut down_, it only kills all the threads, leaving the final
"shut down" as an exercise to the user.
If I recall correctly, the preference of the group was in favor of this
behavior, in which case, I'd advocate for a different name. Maybe just
`STOP_STREAM_THREAD`, `STOP_ALL_STREAM_THREADS`, and
`STOP_ALL_STREAM_THREADS_IN_CLUSTER`.
I've been on the fence about whether I should leave this feedback or not,
but decided to go ahead and pass it on to you because I just got confused by
the names, despite having recently participating in that discussion. So it
seems likely that users would also be confused and think that we're doing the
wrong thing by not actually shutting down the client.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -550,6 +561,10 @@ void runLoop() {
// until the rebalance is completed before we close and commit the
tasks
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
+ if (shutdownRequested.get()) {
+ sendShutdownRequest(shutdownTypeRequested);
+ return;
Review comment:
I can't tell; are we guaranteed to actually send the joinGroup request
by now?
Maybe it's safer to just keep running this loop until we get back the "you
should shut down" response assignment. We _could/should_ add a condition into
`runOnce` so that we don't actually process anything once we have
`shutdownRequested` set.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]