[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r526477258 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} -if (globalStreamThread != null && !globalStreamThread.stillRunning()) { -try { -globalStreamThread.join(); -} catch (final InterruptedException e) { -Thread.currentThread().interrupt(); -} -globalStreamThread = null; +if (globalStreamThread != null && !globalStreamThread.stillRunning()) { +try { +globalStreamThread.join(); +} catch (final InterruptedException e) { +Thread.currentThread().interrupt(); } +globalStreamThread = null; +} -adminClient.close(); +adminClient.close(); -streamsMetrics.removeAllClientLevelMetrics(); -metrics.close(); +streamsMetrics.removeAllClientLevelMetrics(); +metrics.close(); +if (!error) { setState(State.NOT_RUNNING); -}, "kafka-streams-close-thread"); +} +}, "kafka-streams-close-thread"); +} + +private boolean close(final long timeoutMs) { +if (!setState(State.PENDING_SHUTDOWN)) { Review comment: Sounds reasonable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525734417 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} -if (globalStreamThread != null && !globalStreamThread.stillRunning()) { -try { -globalStreamThread.join(); -} catch (final InterruptedException e) { -Thread.currentThread().interrupt(); -} -globalStreamThread = null; +if (globalStreamThread != null && !globalStreamThread.stillRunning()) { +try { +globalStreamThread.join(); +} catch (final InterruptedException e) { +Thread.currentThread().interrupt(); } +globalStreamThread = null; +} -adminClient.close(); +adminClient.close(); -streamsMetrics.removeAllClientLevelMetrics(); -metrics.close(); +streamsMetrics.removeAllClientLevelMetrics(); +metrics.close(); +if (!error) { setState(State.NOT_RUNNING); -}, "kafka-streams-close-thread"); +} +}, "kafka-streams-close-thread"); +} + +private boolean close(final long timeoutMs) { +if (!setState(State.PENDING_SHUTDOWN)) { Review comment: WDYT about having both NOT_RUNNING and ERROR go through PENDING_SHUTDOWN, rather than just transitioning directly and permanently to ERROR? At a high level I think it just makes sense for ERROR and NOT_RUNNING to be symmetric. Also any benefit to having an intermediate PENDING_SHUTDOWN for the NOT_RUNNING case presumably applies to the ERROR case as well. eg, it indicates whether Streams has completed its shutdown or not: users know that an app in PENDING_SHUTDOWN should never be killed, its only safe to do so once it reaches NOT_RUNNING. We should provide the same fun
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525701691 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHandler) { +if (throwable instanceof RuntimeException) { +throw (RuntimeException) throwable; +} else if (throwable instanceof Error) { +throw (Error) throwable; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); +} +} else { +handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT); +} +} + +private void handleStreamsUncaughtException(final Throwable throwable, +final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); +if (oldHandler) { +log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + +"The old handler will be ignored as long as a new handler is set."); +} +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to " + action + "." + +" The streams client is going to shut down now. ", throwable); +close(Duration.ZERO); +break; +case SHUTDOWN_APPLICATION: +if (throwable instanceof Error) { +log.error("This option requires running threads to shut down the application." + +"but the uncaught exception was an Error, which means this runtime is no " + +"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); +} +if (Thread.currentThread().equals(globalStreamThread) && threads.stream().noneMatch(StreamThread::isRunning)) { +log.error("Exception in global thread caused the application to attempt to shutdown." + +" This action will succeed only if there is at least one StreamThread running on this client." + +" Currently there are no running threads so will now close the client."); +close(Duration.ZERO); Review comment: That's fair. I guess I was thinking less about the inherent meaning of ERROR vs NOT_RUNNING, and mor
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525692960 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} Review comment: Eh, I wouldn't bother with an AK ticket if this will be tackled in the next PR. I'll just make a list of all the minor followup work somewhere to keep track This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525663640 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,51 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { +final String errorMessage = e.getMessage(); +if (errorMessage != null && +errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { + +log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + Review comment: We should remember to update the wording here when we add the REPLACE_THREAD functionality This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525658639 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +314,8 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException Review comment: Hm ok this might be a problem. Since this is thrown from another catch block and not from the try block, it won't be caught by the catch block below and will slip through the exception handler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525650632 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} -if (globalStreamThread != null && !globalStreamThread.stillRunning()) { -try { -globalStreamThread.join(); -} catch (final InterruptedException e) { -Thread.currentThread().interrupt(); -} -globalStreamThread = null; +if (globalStreamThread != null && !globalStreamThread.stillRunning()) { +try { +globalStreamThread.join(); +} catch (final InterruptedException e) { +Thread.currentThread().interrupt(); } +globalStreamThread = null; +} -adminClient.close(); +adminClient.close(); -streamsMetrics.removeAllClientLevelMetrics(); -metrics.close(); +streamsMetrics.removeAllClientLevelMetrics(); +metrics.close(); +if (!error) { setState(State.NOT_RUNNING); -}, "kafka-streams-close-thread"); +} +}, "kafka-streams-close-thread"); +} + +private boolean close(final long timeoutMs) { +if (!setState(State.PENDING_SHUTDOWN)) { Review comment: I just realized that this is going to be a problem with the way the ERROR state is being used. IF we `closeToError` then we transition to ERROR and shut down, however `ERROR -> PENDING_SHUTDOWN` is still an allowed transition so there's nothing to prevent the shutdown from being triggered again when a user calls `close()`. And note that a lot of users most likely have a state listener at the moment which does exactly that, ie when it sees a transition to ERROR it immediately invokes close (because that's what you should do with the current semantics) Just another th
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525640088 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } -private boolean close(final long timeoutMs) { -if (!setState(State.PENDING_SHUTDOWN)) { -// if transition failed, it means it was either in PENDING_SHUTDOWN -// or NOT_RUNNING already; just check that all threads have been stopped -log.info("Already in the pending shutdown state, wait to complete shutdown"); -} else { -stateDirCleaner.shutdownNow(); -if (rocksDBMetricsRecordingService != null) { -rocksDBMetricsRecordingService.shutdownNow(); -} +private Thread shutdownHelper(final boolean error) { +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} -// wait for all threads to join in a separate thread; -// save the current thread so that if it is a stream thread -// we don't attempt to join it and cause a deadlock -final Thread shutdownThread = new Thread(() -> { -// notify all the threads to stop; avoid deadlocks by stopping any -// further state reports from the thread since we're shutting down -for (final StreamThread thread : threads) { -thread.shutdown(); -} +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +return new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting down +for (final StreamThread thread : threads) { +thread.shutdown(); +} -for (final StreamThread thread : threads) { -try { -if (!thread.isRunning()) { -thread.join(); -} -} catch (final InterruptedException ex) { -Thread.currentThread().interrupt(); +for (final StreamThread thread : threads) { +try { +if (!thread.isRunning()) { +thread.join(); } +} catch (final InterruptedException ex) { +Thread.currentThread().interrupt(); } +} -if (globalStreamThread != null) { -globalStreamThread.shutdown(); -} +if (globalStreamThread != null) { +globalStreamThread.shutdown(); +} Review comment: Why do we shut down the global thread only after all stream threads have completed their shutdown? Seems like it would be more efficient to send the shutdown signal to everyone first, and then wait for all the threads to join. Can you try this out in the followup PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r525636554 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * In order to get the thread that threw the exception, Thread.currentThread(). + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) { +if (oldHandler) { +if (throwable instanceof RuntimeException) { +throw (RuntimeException) throwable; +} else if (throwable instanceof Error) { +throw (Error) throwable; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); +} +} else { +handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT); +} +} + +private void handleStreamsUncaughtException(final Throwable throwable, +final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable); +if (oldHandler) { +log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + +"The old handler will be ignored as long as a new handler is set."); +} +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to " + action + "." + +" The streams client is going to shut down now. ", throwable); +close(Duration.ZERO); +break; +case SHUTDOWN_APPLICATION: +if (throwable instanceof Error) { +log.error("This option requires running threads to shut down the application." + +"but the uncaught exception was an Error, which means this runtime is no " + +"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable); +} +if (Thread.currentThread().equals(globalStreamThread) && threads.stream().noneMatch(StreamThread::isRunning)) { +log.error("Exception in global thread caused the application to attempt to shutdown." + +" This action will succeed only if there is at least one StreamThread running on this client." + +" Currently there are no running threads so will now close the client."); +close(Duration.ZERO); Review comment: I think it makes more sense to transition to ERROR in this case than to NOT_RUNNING. But let's put t
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r524824649 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,52 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { Review comment: Mm ok actually I think this should be fine. I was thinking of the handler as just "swallowing" the exception, but in reality the user would still let the current thread die and just spin up a new one in its place. And then the new one would hit this UnsupportedVersionException and so on, until the brokers are upgraded. So there shouldn't be any way to get into a bad state This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r524819063 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,52 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { Review comment: Just to clarify I think it's ok to leave this as-is for now, since as Walker said all handler options are fatal at this point This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r524817135 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handleStreamsUncaughtException(final Throwable e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_CLIENT: +log.error("Encountered the following exception during processing " + +"and the registered exception handler opted to \" + action + \"." + +" The streams client is going to shut down now. ", e); +close(Duration.ZERO); Review comment: > Since the stream thread is alive when it calls close() there will not be a deadlock anymore. So, why do we call close() with duration zero @cadonna can you clarify? I thought we would still be in danger of deadlock if we use the blocking `close()`, since `close()` will not return until every thread has joined but the StreamThread that called `close()` would be stuck in this blocking call and thus never stop/join This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r524814932 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,52 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { Review comment: That's a fair point about broker upgrades, but don't we require the brokers to be upgraded to a version that supports EOS _before_ turning on eos-beta? Anyways I was wondering if there was something special about this exception such that ignoring it could violate eos or corrupt the state of the program. I'll ping the eos experts to assuage my concerns This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523375614 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +314,8 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException ); +} catch (final Exception e) { +this.streamsUncaughtExceptionHandler.accept(e); Review comment: Oh you're totally right, sorry for letting my paranoia start spreading conspiracy theories here 🙂 Given all this I'd still claim that the FSM is in need to being cleaned up a bit (or a lot), but if you'd prefer to hold off on that until the add thread work then I'm all good here. Thanks for humoring me and explaining the state of things. I just wanted/want to make sure we don't overlook anything, since there's a lot going on. For example in the current code, if the global thread dies with the old handler still in use then we'll transition to ERROR. However the user still has to be responsible for closing the client themselves, and it will ultimately transition from ERROR to NOT_RUNNING. Whereas if we transition to ERROR as the result of a SHUTDOWN_APPLICATION error code, the user should NOT try to invoke close themselves, and the ERROR state will be terminal. That's pretty confusing eg for users who use a state listener and wait for the transition to ERROR to call close(). We should make sure that ERROR has the same semantics across the board by the end of all this work. Anyways I'm just thinking out loud here, to reiterate I'm perfectly happy to merge this as-is. But for reasons like the above, I think it's important to tackle the FSM in the next PR and make sure it all gets sorted out by the next AK release This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523372464 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1000,7 +1012,17 @@ public void restore(final Map tasks) { CLIENT_ID, new LogContext(""), new AtomicInteger(), -new AtomicLong(Long.MAX_VALUE) +new AtomicLong(Long.MAX_VALUE), +null, +e -> { +if (e instanceof RuntimeException) { +throw (RuntimeException) e; +} else if (e instanceof Error) { +throw (Error) e; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", e); +} Review comment: Ah ok so there's some other IllegalStateException that would get swallowed if we just used `e -> {}` like in the other tests, so we need to explicitly rethrow it? That seems fine, although it makes me think that we should go ahead and use a "real" handler in _all_ of the tests, not just this one. Otherwise there could be some bug which causes an unexpected exception, but the test would just swallow it and silently pass. Can we just use the default handler wrapper for all of these tests so they reflect realistic scenarios? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523327338 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * So inorder to get the thread as the java handler type uses use Thread.currentThread() + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void handleStreamsUncaughtExceptionDefaultWrapper(final Throwable throwable) { Review comment: Well it's not exactly a default, technically this method is always used to decide which handler to invoke (which may or may not invoke a default handler). Any of these would be fine by me but I'll throw one more idea out there: `invokeOldOrNewUncaughtExceptionHandler` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523322776 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1000,7 +1012,17 @@ public void restore(final Map tasks) { CLIENT_ID, new LogContext(""), new AtomicInteger(), -new AtomicLong(Long.MAX_VALUE) +new AtomicLong(Long.MAX_VALUE), +null, +e -> { +if (e instanceof RuntimeException) { +throw (RuntimeException) e; +} else if (e instanceof Error) { +throw (Error) e; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", e); +} Review comment: But TaskMigratedException should never be thrown all the way up to the exception handler. Is that what you're seeing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523319677 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java ## @@ -144,6 +143,7 @@ public void whenShuttingDown() throws IOException { } @Test +@Deprecated //a single thread should no longer die Review comment: I think any test that's trying to verify some unrelated behavior and just using the "one thread dies at a time" paradigm as a tool to do so should not be deleted. I'm sure in most if not all cases, there's some way to modify the test to verify that specific behavior either using the new handler or multiple apps or rewriting it altogether. But, there are a lot of tests that do this and a lot of them are pretty tricky, so I wouldn't want to stall this PR on waiting for all of these tests to be updated/adapted. I think we should file tickets for all of these tests and just try to pick up one or two of them every so often. Maybe that's being overly optimistic about our inclination to pick up small tasks even over a long period, but it's better than losing track of them altogether. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523311219 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ## @@ -115,6 +119,10 @@ private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) { this.data = subscriptionInfoData; } +public int errorCode() { +return data.errorCode(); Review comment: What happens if we try to read the error code of an earlier subscription version? I genuinely don't know what the generated code does, but we should make sure it doesn't throw an NPE or something. Could you add a unit test for this case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523303062 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +314,8 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException ); +} catch (final Exception e) { +this.streamsUncaughtExceptionHandler.accept(e); Review comment: I suspect the tests didn't catch this because we would still transition out of ERROR to PENDING_SHUTDOWN and finally NOT_RUNNING in this case. But really, we shouldn't transition to ERROR in the first place This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523302976 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -311,6 +314,8 @@ public void run() { "Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException ); +} catch (final Exception e) { +this.streamsUncaughtExceptionHandler.accept(e); Review comment: Ah ok I thought we executed this cleanup logic in the GlobalStreamThread's `shutdown` method but now I see that's not true. Sorry for the confusion there. I do see some minor outstanding issues here, mainly around the state diagram. Let's say the user opts to `SHUTDOWN_CLIENT` in the new handler: the intended semantics are to end up in `NOT_RUNNING` But I think what would happen is that from the global thread we would immediately call `KafkaStreams#close` , which kicks off a shutdown thread to wait for all threads to join and then sets the state to `NOT_RUNNING`. Then when the handler returns, it would transition the global thread to `PENDING_SHUTDOWN` and then finally to `DEAD`. And during the transition to `DEAD`, we would actually end up transitioning the KafkaStreams instance to `ERROR`, rather than `NOT_RUNNING` as intended. So probably, we just need to update the `onChange` method in KafkaStreams. This also reminds me of another thing, we need to update the FSM diagram and allowed transitions in KafkaStreams to reflect the new semantics we decided on for ERROR (which IIRC is basically just to make it a terminal state). Does that sound right to you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r523296833 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * So inorder to get the thread as the java handler type uses use Thread.currentThread() Review comment: Is there an extra `uses` in there or am I not looking at this sentence from the right angle? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r522596527 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * So inorder to get the thread as the java handler type uses use Thread.currentThread() Review comment: This wording is a little difficult to parse ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * The handler will execute on the thread that produced the exception. + * So inorder to get the thread as the java handler type uses use Thread.currentThread() + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private void handleStreamsUncaughtExceptionDefaultWrapper(final Throwable throwable) { +if (Thread.getDefaultUncaughtExceptionHandler() != null) { +if (throwable instanceof RuntimeException) { +throw (RuntimeException) throwable; +} else if (throwable instanceof Error) { +throw (Error) throwable; +} else { +throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); +} Review comment: Just curious, what's the motivation for doing it like this vs just immediately throwing the exception? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -559,18 +552,52 @@ void runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); +"Will close the task as dirty and re-create and bootstrap from scratch.", e); try { taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final UnsupportedVersionException e) { Review comment: Hmm...this one seems like it should be a fatal error, so is it safe to just pass it along to the user and let them potentially just keep replacing the thread? (I kn
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518488577 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -255,8 +255,9 @@ public ByteBuffer subscriptionUserData(final Set topics) { taskManager.processId(), userEndPoint, taskManager.getTaskOffsetSums(), -uniqueField) -.encode(); +uniqueField, +(byte) assignmentErrorCode.get() Review comment: This cast makes me kind of uncomfortable...either the `assignmentErrorCode` that we have in the AssignmentInfo is conceptually the same as the one we're adding to the SubscriptionInfo (in which case it should be the same type), or it's not the same, in which case we should use a different variable to track it. Personally I think it's probably simpler to keep them the same, and just add an `int` errorCode field to the Subscription instead of a `byte` shutdownRequested field. But it's your choice This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r520077048 ## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ## @@ -57,6 +57,11 @@ "name": "uniqueField", "versions": "8+", "type": "int8" +}, +{ + "name": "shutdownRequested", + "versions": "9+", + "type": "int8" Review comment: I'm not really worried that we'd run out of space, I just think it sends a signal that the Assignment and Subscription error codes are semantically distinct and don't refer to the same underlying concept. So it seems better to go with the simpler approach than over-optimize to save an occasional three bytes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518913514 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +log.info("Can not transition to error from state " + state()); Review comment: Gotcha. In that case maybe we shouldn't log anything here at all? Or just reword it to clarify that this is expected (eg `"Skipping shutdown since we are already in ERROR"`) since "Can not transition..." kind of sounds like something went wrong This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r518478117 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -567,10 +589,34 @@ void runLoop() { } } catch (final TaskMigratedException e) { handleTaskMigrated(e); +} catch (final Exception e) { +if (this.streamsUncaughtExceptionHandler.handle(e)) { Review comment: I had a little trouble following the `Handler` class. Some trivial things -- eg the handler in the StreamThread is named `streamsUncaughtExceptionHandler` but it's actually _not_ a `StreamsUncaughtExceptionHandler`. Also the usage of the return value; IIUC it's supposed to indicate whether to use the new handler or fall back on the old one. To me it sounds like if `handle` returns `true` that means we should handle it, ie we should _not_ rethrow the exception, but this looks like the opposite of what we do now. Honestly either interpretation is ok with me, as long as it's documented somewhere Do we really need the `Handler` in the first place though? It's already pretty confusing that we have to deal with two types of handlers (old and new) so I'd prefer not to add a third unless it's really necessary. It seems like we can just inline the logic of whether to invoke the new handler or rethrow the exception, which would also clear up the confusion around the meaning of the return value. But I might be missing something here -- WDYT? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -282,6 +283,17 @@ public boolean isRunning() { private final Admin adminClient; private final InternalTopologyBuilder builder; +private Handler streamsUncaughtExceptionHandler; +private ShutdownErrorHook shutdownErrorHook; +private AtomicInteger assignmentErrorCode; +public interface ShutdownErrorHook { +void shutdown(); +} Review comment: Seems like we can just pass in a Runnable with `KafkaStreams::closeToError` instead of adding a whole `ShutdownErrorHook` functional interface ## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ## @@ -57,6 +57,11 @@ "name": "uniqueField", "versions": "8+", "type": "int8" +}, +{ + "name": "shutdownRequested", + "versions": "9+", + "type": "int8" Review comment: I think we should mirror the `errorCode` in the AssignmentInfo here, both in terms of naming and type. If we're going to use the same AssignorError for both, then they should really be the same. And we may want to send other kinds of error codes in the subscription going forward: better to just encode a single `int` than a separate `byte` for every logical error code. I don't think we'll notice the extra three bytes since Subscriptions aren't sent that frequently ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java ## @@ -30,7 +30,7 @@ public Admin adminClient; public TaskManager taskManager; public StreamsMetadataState streamsMetadataState; -public final AtomicInteger assignmentErrorCode = new AtomicInteger(); +public AtomicInteger assignmentErrorCode = new AtomicInteger(); Review comment: This should probably stay `final` so we don't accidentally change it ever ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +log.info("Can not transition to error from state " + state()); Review comment: Should this be logged at error? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) { } } +private void closeToError() { +if (!setState(State.ERROR)) { +log.info("Can not transition to error from state " + state()); +} else { +log.info("Transitioning to ERROR state"); +stateDirCleaner.shutdownNow(); +if (rocksDBMetricsRecordingService != null) { +rocksDBMetricsRecordingService.shutdownNow(); +} + +// wait for all threads to join in a separate thread; +// save the current thread so that if it is a stream thread +// we don't attempt to join it and cause a deadlock +final Thread shutdownThread = new Thread(() -> { +// notify all the threads to stop; avoid deadlocks by stopping any +// further state reports from the thread since we're shutting
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r517543638 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java ## @@ -19,7 +19,7 @@ public final class StreamsAssignmentProtocolVersions { public static final int UNKNOWN = -1; public static final int EARLIEST_PROBEABLE_VERSION = 3; -public static final int LATEST_SUPPORTED_VERSION = 8; +public static final int LATEST_SUPPORTED_VERSION = 9; Review comment: Can you also leave a comment here reminding us to fix the version probing system test whenever this protocol number is bumped? Since we apparently always forget This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org