wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518838586



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) {
         }
     }
 
+    private void closeToError() {
+        if (!setState(State.ERROR)) {
+            log.info("Can not transition to error from state " + state());
+        } else {
+            log.info("Transitioning to ERROR state");
+            stateDirCleaner.shutdownNow();
+            if (rocksDBMetricsRecordingService != null) {
+                rocksDBMetricsRecordingService.shutdownNow();
+            }
+
+            // wait for all threads to join in a separate thread;
+            // save the current thread so that if it is a stream thread
+            // we don't attempt to join it and cause a deadlock
+            final Thread shutdownThread = new Thread(() -> {
+                // notify all the threads to stop; avoid deadlocks by stopping 
any
+                // further state reports from the thread since we're shutting 
down
+                for (final StreamThread thread : threads) {
+                    thread.shutdown();
+                }
+
+                for (final StreamThread thread : threads) {
+                    try {
+                        if (!thread.isRunning()) {
+                            thread.join();
+                        }
+                    } catch (final InterruptedException ex) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+                if (globalStreamThread != null) {
+                    globalStreamThread.shutdown();
+                }
+
+                if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+                    try {
+                        globalStreamThread.join();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    globalStreamThread = null;
+                }
+
+                adminClient.close();
+
+                streamsMetrics.removeAllClientLevelMetrics();
+                metrics.close();
+                setState(State.ERROR);
+            }, "kafka-streams-close-thread");
+
+            shutdownThread.setDaemon(true);
+            shutdownThread.start();
+            setState(State.ERROR);

Review comment:
       No, I hadn't seen that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to