[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-18 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r526477258



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}
 
-if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-try {
-globalStreamThread.join();
-} catch (final InterruptedException e) {
-Thread.currentThread().interrupt();
-}
-globalStreamThread = null;
+if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+try {
+globalStreamThread.join();
+} catch (final InterruptedException e) {
+Thread.currentThread().interrupt();
 }
+globalStreamThread = null;
+}
 
-adminClient.close();
+adminClient.close();
 
-streamsMetrics.removeAllClientLevelMetrics();
-metrics.close();
+streamsMetrics.removeAllClientLevelMetrics();
+metrics.close();
+if (!error) {
 setState(State.NOT_RUNNING);
-}, "kafka-streams-close-thread");
+}
+}, "kafka-streams-close-thread");
+}
+
+private boolean close(final long timeoutMs) {
+if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
   Sounds reasonable





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525734417



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}
 
-if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-try {
-globalStreamThread.join();
-} catch (final InterruptedException e) {
-Thread.currentThread().interrupt();
-}
-globalStreamThread = null;
+if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+try {
+globalStreamThread.join();
+} catch (final InterruptedException e) {
+Thread.currentThread().interrupt();
 }
+globalStreamThread = null;
+}
 
-adminClient.close();
+adminClient.close();
 
-streamsMetrics.removeAllClientLevelMetrics();
-metrics.close();
+streamsMetrics.removeAllClientLevelMetrics();
+metrics.close();
+if (!error) {
 setState(State.NOT_RUNNING);
-}, "kafka-streams-close-thread");
+}
+}, "kafka-streams-close-thread");
+}
+
+private boolean close(final long timeoutMs) {
+if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
   WDYT about having both NOT_RUNNING and ERROR go through 
PENDING_SHUTDOWN, rather than just transitioning directly and permanently to 
ERROR? At a high level I think it just makes sense for ERROR and NOT_RUNNING to 
be symmetric. Also any benefit to having an intermediate PENDING_SHUTDOWN for 
the NOT_RUNNING case presumably applies to the ERROR case as well. eg, it 
indicates whether Streams has completed its shutdown or not: users know that an 
app in PENDING_SHUTDOWN should never be killed, its only safe to do so once it 
reaches NOT_RUNNING. We should provide the same fun

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525701691



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHandler) {
+if (throwable instanceof RuntimeException) {
+throw (RuntimeException) throwable;
+} else if (throwable instanceof Error) {
+throw (Error) throwable;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+}
+} else {
+handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+}
+}
+
+private void handleStreamsUncaughtException(final Throwable throwable,
+final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
+if (oldHandler) {
+log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
+"The old handler will be ignored as long as a new handler 
is set.");
+}
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to " + 
action + "." +
+" The streams client is going to shut down now. ", 
throwable);
+close(Duration.ZERO);
+break;
+case SHUTDOWN_APPLICATION:
+if (throwable instanceof Error) {
+log.error("This option requires running threads to shut 
down the application." +
+"but the uncaught exception was an Error, which 
means this runtime is no " +
+"longer in a well-defined state. Attempting to 
send the shutdown command anyway.", throwable);
+}
+if (Thread.currentThread().equals(globalStreamThread) && 
threads.stream().noneMatch(StreamThread::isRunning)) {
+log.error("Exception in global thread caused the 
application to attempt to shutdown." +
+" This action will succeed only if there is at 
least one StreamThread running on this client." +
+" Currently there are no running threads so will 
now close the client.");
+close(Duration.ZERO);

Review comment:
   That's fair. I guess I was thinking less about the inherent meaning of 
ERROR vs NOT_RUNNING, and mor

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525692960



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}

Review comment:
   Eh, I wouldn't bother with an AK ticket if this will be tackled in the 
next PR. I'll just make a list of all the minor followup work somewhere to keep 
track





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525663640



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,51 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {
+final String errorMessage = e.getMessage();
+if (errorMessage != null &&
+errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {
+
+log.error("Shutting down because the Kafka cluster seems 
to be on a too old version. " +

Review comment:
   We should remember to update the wording here when we add the 
REPLACE_THREAD functionality





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525658639



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +314,8 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException

Review comment:
   Hm ok this might be a problem. Since this is thrown from another catch 
block and not from the try block, it won't be caught by the catch block below 
and will slip through the exception handler. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525650632



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}
 
-if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-try {
-globalStreamThread.join();
-} catch (final InterruptedException e) {
-Thread.currentThread().interrupt();
-}
-globalStreamThread = null;
+if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+try {
+globalStreamThread.join();
+} catch (final InterruptedException e) {
+Thread.currentThread().interrupt();
 }
+globalStreamThread = null;
+}
 
-adminClient.close();
+adminClient.close();
 
-streamsMetrics.removeAllClientLevelMetrics();
-metrics.close();
+streamsMetrics.removeAllClientLevelMetrics();
+metrics.close();
+if (!error) {
 setState(State.NOT_RUNNING);
-}, "kafka-streams-close-thread");
+}
+}, "kafka-streams-close-thread");
+}
+
+private boolean close(final long timeoutMs) {
+if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
   I just realized that this is going to be a problem with the way the 
ERROR state is being used. IF we `closeToError` then we transition to ERROR and 
shut down, however `ERROR -> PENDING_SHUTDOWN` is still an allowed transition 
so there's nothing to prevent the shutdown from being triggered again when a 
user calls `close()`. And note that a lot of users most likely have a state 
listener at the moment which does exactly that, ie when it sees a transition to 
ERROR it immediately invokes close (because that's what you should do with the 
current semantics)
   Just another th

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525640088



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
 return close(timeoutMs);
 }
 
-private boolean close(final long timeoutMs) {
-if (!setState(State.PENDING_SHUTDOWN)) {
-// if transition failed, it means it was either in PENDING_SHUTDOWN
-// or NOT_RUNNING already; just check that all threads have been 
stopped
-log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-} else {
-stateDirCleaner.shutdownNow();
-if (rocksDBMetricsRecordingService != null) {
-rocksDBMetricsRecordingService.shutdownNow();
-}
+private Thread shutdownHelper(final boolean error) {
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
 
-// wait for all threads to join in a separate thread;
-// save the current thread so that if it is a stream thread
-// we don't attempt to join it and cause a deadlock
-final Thread shutdownThread = new Thread(() -> {
-// notify all the threads to stop; avoid deadlocks by stopping 
any
-// further state reports from the thread since we're shutting 
down
-for (final StreamThread thread : threads) {
-thread.shutdown();
-}
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+return new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping any
+// further state reports from the thread since we're shutting down
+for (final StreamThread thread : threads) {
+thread.shutdown();
+}
 
-for (final StreamThread thread : threads) {
-try {
-if (!thread.isRunning()) {
-thread.join();
-}
-} catch (final InterruptedException ex) {
-Thread.currentThread().interrupt();
+for (final StreamThread thread : threads) {
+try {
+if (!thread.isRunning()) {
+thread.join();
 }
+} catch (final InterruptedException ex) {
+Thread.currentThread().interrupt();
 }
+}
 
-if (globalStreamThread != null) {
-globalStreamThread.shutdown();
-}
+if (globalStreamThread != null) {
+globalStreamThread.shutdown();
+}

Review comment:
   Why do we shut down the global thread only after all stream threads have 
completed their shutdown? Seems like it would be more efficient to send the 
shutdown signal to everyone first, and then wait for all the threads to join. 
Can you try this out in the followup PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-17 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r525636554



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +377,90 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * In order to get the thread that threw the exception, 
Thread.currentThread().
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable) {
+if (oldHandler) {
+if (throwable instanceof RuntimeException) {
+throw (RuntimeException) throwable;
+} else if (throwable instanceof Error) {
+throw (Error) throwable;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+}
+} else {
+handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+}
+}
+
+private void handleStreamsUncaughtException(final Throwable throwable,
+final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
+if (oldHandler) {
+log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
+"The old handler will be ignored as long as a new handler 
is set.");
+}
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to " + 
action + "." +
+" The streams client is going to shut down now. ", 
throwable);
+close(Duration.ZERO);
+break;
+case SHUTDOWN_APPLICATION:
+if (throwable instanceof Error) {
+log.error("This option requires running threads to shut 
down the application." +
+"but the uncaught exception was an Error, which 
means this runtime is no " +
+"longer in a well-defined state. Attempting to 
send the shutdown command anyway.", throwable);
+}
+if (Thread.currentThread().equals(globalStreamThread) && 
threads.stream().noneMatch(StreamThread::isRunning)) {
+log.error("Exception in global thread caused the 
application to attempt to shutdown." +
+" This action will succeed only if there is at 
least one StreamThread running on this client." +
+" Currently there are no running threads so will 
now close the client.");
+close(Duration.ZERO);

Review comment:
   I think it makes more sense to transition to ERROR in this case than to 
NOT_RUNNING. But let's put t

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-16 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r524824649



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,52 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {

Review comment:
   Mm ok actually I think this should be fine. I was thinking of the 
handler as just "swallowing" the exception, but in reality the user would still 
let the current thread die and just spin up a new one in its place. And then 
the new one would hit this UnsupportedVersionException and so on, until the 
brokers are upgraded. So there shouldn't be any way to get into a bad state





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-16 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r524819063



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,52 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {

Review comment:
   Just to clarify I think it's ok to leave this as-is for now, since as 
Walker said all handler options are fatal at this point 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-16 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r524817135



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to \" + 
action + \"." +
+" The streams client is going to shut down now. ", e);
+close(Duration.ZERO);

Review comment:
   > Since the stream thread is alive when it calls close() there will not 
be a deadlock anymore. So, why do we call close() with duration zero
   
   @cadonna can you clarify? I thought we would still be in danger of deadlock 
if we use the blocking `close()`, since `close()` will not return until every 
thread has joined but the StreamThread that called `close()` would be stuck in 
this blocking call and thus never stop/join





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-16 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r524814932



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,52 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {

Review comment:
   That's a fair point about broker upgrades, but don't we require the 
brokers to be upgraded to a version that supports EOS _before_ turning on 
eos-beta?
   Anyways I was wondering if there was something special about this exception 
such that ignoring it could violate eos or corrupt the state of the program. 
I'll ping the eos experts to assuage my concerns





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523375614



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +314,8 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException
 );
+} catch (final Exception e) {
+this.streamsUncaughtExceptionHandler.accept(e);

Review comment:
   Oh you're totally right, sorry for letting my paranoia start spreading 
conspiracy theories here 🙂  Given all this I'd still claim that the FSM is in 
need to being cleaned up a bit (or a lot), but if you'd prefer to hold off on 
that until the add thread work then I'm all good here. Thanks for humoring me 
and explaining the state of things. I just wanted/want to make sure we don't 
overlook anything, since there's a lot going on.
   
   For example in the current code, if the global thread dies with the old 
handler still in use then we'll transition to ERROR. However the user still has 
to be responsible for closing the client themselves, and it will ultimately 
transition from ERROR to NOT_RUNNING. Whereas if we transition to ERROR as the 
result of a SHUTDOWN_APPLICATION error code, the user should NOT try to invoke 
close themselves, and the ERROR state will be terminal. That's pretty confusing 
eg for users who use a state listener and wait for the transition to ERROR to 
call close(). We should make sure that ERROR has the same semantics across the 
board by the end of all this work.
   
   Anyways I'm just thinking out loud here, to reiterate I'm perfectly happy to 
merge this as-is. But for reasons like the above, I think it's important to 
tackle the FSM in the next PR and make sure it all gets sorted out by the next 
AK release





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523372464



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1000,7 +1012,17 @@ public void restore(final Map tasks) {
 CLIENT_ID,
 new LogContext(""),
 new AtomicInteger(),
-new AtomicLong(Long.MAX_VALUE)
+new AtomicLong(Long.MAX_VALUE),
+null,
+e -> {
+if (e instanceof RuntimeException) {
+throw (RuntimeException) e;
+} else if (e instanceof Error) {
+throw (Error) e;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", e);
+}

Review comment:
   Ah ok so there's some other IllegalStateException that would get 
swallowed if we just used `e -> {}` like in the other tests, so we need to 
explicitly rethrow it? That seems fine, although it makes me think that we 
should go ahead and use a "real" handler in _all_ of the tests, not just this 
one. Otherwise there could be some bug which causes an unexpected exception, 
but the test would just swallow it and silently pass.
   Can we just use the default handler wrapper for all of these tests so they 
reflect realistic scenarios?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523327338



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * So inorder to get the thread as the java handler type uses use 
Thread.currentThread()
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void handleStreamsUncaughtExceptionDefaultWrapper(final Throwable 
throwable) {

Review comment:
   Well it's not exactly a default, technically this method is always used 
to decide which handler to invoke (which may or may not invoke a default 
handler). Any of these would be fine by me but I'll throw one more idea out 
there: `invokeOldOrNewUncaughtExceptionHandler` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523322776



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1000,7 +1012,17 @@ public void restore(final Map tasks) {
 CLIENT_ID,
 new LogContext(""),
 new AtomicInteger(),
-new AtomicLong(Long.MAX_VALUE)
+new AtomicLong(Long.MAX_VALUE),
+null,
+e -> {
+if (e instanceof RuntimeException) {
+throw (RuntimeException) e;
+} else if (e instanceof Error) {
+throw (Error) e;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", e);
+}

Review comment:
   But TaskMigratedException should never be thrown all the way up to the 
exception handler. Is that what you're seeing?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523319677



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
##
@@ -144,6 +143,7 @@ public void whenShuttingDown() throws IOException {
 }
 
 @Test
+@Deprecated //a single thread should no longer die

Review comment:
   I think any test that's trying to verify some unrelated behavior and 
just using the "one thread dies at a time" paradigm as a tool to do so should 
not be deleted. I'm sure in most if not all cases, there's some way to modify 
the test to verify that specific behavior either using the new handler or 
multiple apps or rewriting it altogether. 
   
   But, there are a lot of tests that do this and a lot of them are pretty 
tricky, so I wouldn't want to stall this PR on waiting for all of these tests 
to be updated/adapted. I think we should file tickets for all of these tests 
and just try to pick up one or two of them every so often. Maybe that's being 
overly optimistic about our inclination to pick up small tasks even over a long 
period, but it's better than losing track of them altogether. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523311219



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##
@@ -115,6 +119,10 @@ private SubscriptionInfo(final SubscriptionInfoData 
subscriptionInfoData) {
 this.data = subscriptionInfoData;
 }
 
+public int errorCode() {
+return data.errorCode();

Review comment:
   What happens if we try to read the error code of an earlier subscription 
version? I genuinely don't know what the generated code does, but we should 
make sure it doesn't throw an NPE or something. Could you add a unit test for 
this case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523303062



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +314,8 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException
 );
+} catch (final Exception e) {
+this.streamsUncaughtExceptionHandler.accept(e);

Review comment:
   I suspect the tests didn't catch this because we would still transition 
out of ERROR to PENDING_SHUTDOWN and finally NOT_RUNNING in this case. But 
really, we shouldn't transition to ERROR in the first place





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523302976



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -311,6 +314,8 @@ public void run() {
 "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
 recoverableException
 );
+} catch (final Exception e) {
+this.streamsUncaughtExceptionHandler.accept(e);

Review comment:
   Ah ok I thought we executed this cleanup logic in the 
GlobalStreamThread's `shutdown` method but now I see that's not true. Sorry for 
the confusion there.
   I do see some minor outstanding issues here, mainly around the state 
diagram. Let's say the user opts to `SHUTDOWN_CLIENT` in the new handler: the 
intended semantics are to end up in `NOT_RUNNING` 
   But I think what would happen is that from the global thread we would 
immediately call `KafkaStreams#close` , which kicks off a shutdown thread to 
wait for all threads to join and then sets the state to `NOT_RUNNING`. Then 
when the handler returns, it would transition the global thread to 
`PENDING_SHUTDOWN` and then finally to `DEAD`. And during the transition to 
`DEAD`, we would actually end up transitioning the KafkaStreams instance to 
`ERROR`, rather than `NOT_RUNNING` as intended. So probably, we just need to 
update the `onChange` method in KafkaStreams.
   This also reminds me of another thing, we need to update the FSM diagram and 
allowed transitions in KafkaStreams to reflect the new semantics we decided on 
for ERROR (which IIRC is basically just to make it a terminal state). Does that 
sound right to you?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-13 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r523296833



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * So inorder to get the thread as the java handler type uses use 
Thread.currentThread()

Review comment:
   Is there an extra `uses` in there or am I not looking at this sentence 
from the right angle?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-12 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r522596527



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * So inorder to get the thread as the java handler type uses use 
Thread.currentThread()

Review comment:
   This wording is a little difficult to parse

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +375,93 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * The handler will execute on the thread that produced the exception.
+ * So inorder to get the thread as the java handler type uses use 
Thread.currentThread()
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private void handleStreamsUncaughtExceptionDefaultWrapper(final Throwable 
throwable) {
+if (Thread.getDefaultUncaughtExceptionHandler() != null) {
+if (throwable instanceof RuntimeException) {
+throw (RuntimeException) throwable;
+} else if (throwable instanceof Error) {
+throw (Error) throwable;
+} else {
+throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+}

Review comment:
   Just curious, what's the motivation for doing it like this vs just 
immediately throwing the exception?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -559,18 +552,52 @@ void runLoop() {
 }
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
- "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+"Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
 try {
 
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
 } catch (final TaskMigratedException taskMigrated) {
 handleTaskMigrated(taskMigrated);
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final UnsupportedVersionException e) {

Review comment:
   Hmm...this one seems like it should be a fatal error, so is it safe to 
just pass it along to the user and let them potentially just keep replacing the 
thread? (I kn

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-12 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518488577



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -255,8 +255,9 @@ public ByteBuffer subscriptionUserData(final Set 
topics) {
 taskManager.processId(),
 userEndPoint,
 taskManager.getTaskOffsetSums(),
-uniqueField)
-.encode();
+uniqueField,
+(byte) assignmentErrorCode.get()

Review comment:
   This cast makes me kind of uncomfortable...either the 
`assignmentErrorCode` that we have in the AssignmentInfo is conceptually the 
same as the one we're adding to the SubscriptionInfo (in which case it should 
be the same type), or it's not the same, in which case we should use a 
different variable to track it.
   
   Personally I think it's probably simpler to keep them the same, and just add 
an `int` errorCode field to the Subscription instead of a `byte` 
shutdownRequested field. But it's your choice





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-09 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r520077048



##
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##
@@ -57,6 +57,11 @@
   "name": "uniqueField",
   "versions": "8+",
   "type": "int8"
+},
+{
+  "name": "shutdownRequested",
+  "versions": "9+",
+  "type": "int8"

Review comment:
   I'm not really worried that we'd run out of space, I just think it sends 
a signal that the Assignment and Subscription error codes are semantically 
distinct and don't refer to the same underlying concept. So it seems better to 
go with the simpler approach than over-optimize to save an occasional three 
bytes 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-06 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518913514



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+log.info("Can not transition to error from state " + state());

Review comment:
   Gotcha. In that case maybe we shouldn't log anything here at all? Or 
just reword it to clarify that this is expected (eg `"Skipping shutdown since 
we are already in ERROR"`) since "Can not transition..." kind of sounds like 
something went wrong





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518478117



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -567,10 +589,34 @@ void runLoop() {
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final Exception e) {
+if (this.streamsUncaughtExceptionHandler.handle(e)) {

Review comment:
   I had a little trouble following the `Handler` class. Some trivial 
things -- eg the handler in the StreamThread is named 
`streamsUncaughtExceptionHandler` but it's actually _not_ a 
`StreamsUncaughtExceptionHandler`. Also the usage of the return value; IIUC 
it's supposed to indicate whether to use the new handler or fall back on the 
old one. To me it sounds like if `handle` returns `true` that means we should 
handle it, ie we should _not_ rethrow the exception, but this looks like the 
opposite of what we do now. Honestly either interpretation is ok with me, as 
long as it's documented somewhere
   
   Do we really need the `Handler` in the first place though? It's already 
pretty confusing that we have to deal with two types of handlers (old and new) 
so I'd prefer not to add a third unless it's really necessary. It seems like we 
can just inline the logic of whether to invoke the new handler or rethrow the 
exception, which would also clear up the confusion around the meaning of the 
return value. But I might be missing something here -- WDYT?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -282,6 +283,17 @@ public boolean isRunning() {
 private final Admin adminClient;
 private final InternalTopologyBuilder builder;
 
+private Handler streamsUncaughtExceptionHandler;
+private ShutdownErrorHook shutdownErrorHook;
+private AtomicInteger assignmentErrorCode;
+public interface ShutdownErrorHook {
+void shutdown();
+}

Review comment:
   Seems like we can just pass in a Runnable with 
`KafkaStreams::closeToError` instead of adding a whole `ShutdownErrorHook` 
functional interface

##
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##
@@ -57,6 +57,11 @@
   "name": "uniqueField",
   "versions": "8+",
   "type": "int8"
+},
+{
+  "name": "shutdownRequested",
+  "versions": "9+",
+  "type": "int8"

Review comment:
   I think we should mirror the `errorCode` in the AssignmentInfo here, 
both in terms of naming and type. If we're going to use the same AssignorError 
for both, then they should really be the same. And we may want to send other 
kinds of error codes in the subscription going forward: better to just encode a 
single `int` than a separate `byte` for every logical error code. I don't think 
we'll notice the extra three bytes since Subscriptions aren't sent that 
frequently

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
##
@@ -30,7 +30,7 @@
 public Admin adminClient;
 public TaskManager taskManager;
 public StreamsMetadataState streamsMetadataState;
-public final AtomicInteger assignmentErrorCode = new AtomicInteger();
+public AtomicInteger assignmentErrorCode = new AtomicInteger();

Review comment:
   This should probably stay `final` so we don't accidentally change it ever

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+log.info("Can not transition to error from state " + state());

Review comment:
   Should this be logged at error?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+log.info("Can not transition to error from state " + state());
+} else {
+log.info("Transitioning to ERROR state");
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
+
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+final Thread shutdownThread = new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping 
any
+// further state reports from the thread since we're shutting 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-04 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r517543638



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
##
@@ -19,7 +19,7 @@
 public final class StreamsAssignmentProtocolVersions {
 public static final int UNKNOWN = -1;
 public static final int EARLIEST_PROBEABLE_VERSION = 3;
-public static final int LATEST_SUPPORTED_VERSION = 8;
+public static final int LATEST_SUPPORTED_VERSION = 9;

Review comment:
   Can you also leave a comment here reminding us to fix the version 
probing system test whenever this protocol number is bumped? Since we 
apparently always forget





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org