wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r526211409



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -932,56 +1028,62 @@ public synchronized boolean close(final long timeout, 
final TimeUnit timeUnit) {
         return close(timeoutMs);
     }
 
-    private boolean close(final long timeoutMs) {
-        if (!setState(State.PENDING_SHUTDOWN)) {
-            // if transition failed, it means it was either in PENDING_SHUTDOWN
-            // or NOT_RUNNING already; just check that all threads have been 
stopped
-            log.info("Already in the pending shutdown state, wait to complete 
shutdown");
-        } else {
-            stateDirCleaner.shutdownNow();
-            if (rocksDBMetricsRecordingService != null) {
-                rocksDBMetricsRecordingService.shutdownNow();
-            }
+    private Thread shutdownHelper(final boolean error) {
+        stateDirCleaner.shutdownNow();
+        if (rocksDBMetricsRecordingService != null) {
+            rocksDBMetricsRecordingService.shutdownNow();
+        }
 
-            // wait for all threads to join in a separate thread;
-            // save the current thread so that if it is a stream thread
-            // we don't attempt to join it and cause a deadlock
-            final Thread shutdownThread = new Thread(() -> {
-                // notify all the threads to stop; avoid deadlocks by stopping 
any
-                // further state reports from the thread since we're shutting 
down
-                for (final StreamThread thread : threads) {
-                    thread.shutdown();
-                }
+        // wait for all threads to join in a separate thread;
+        // save the current thread so that if it is a stream thread
+        // we don't attempt to join it and cause a deadlock
+        return new Thread(() -> {
+            // notify all the threads to stop; avoid deadlocks by stopping any
+            // further state reports from the thread since we're shutting down
+            for (final StreamThread thread : threads) {
+                thread.shutdown();
+            }
 
-                for (final StreamThread thread : threads) {
-                    try {
-                        if (!thread.isRunning()) {
-                            thread.join();
-                        }
-                    } catch (final InterruptedException ex) {
-                        Thread.currentThread().interrupt();
+            for (final StreamThread thread : threads) {
+                try {
+                    if (!thread.isRunning()) {
+                        thread.join();
                     }
+                } catch (final InterruptedException ex) {
+                    Thread.currentThread().interrupt();
                 }
+            }
 
-                if (globalStreamThread != null) {
-                    globalStreamThread.shutdown();
-                }
+            if (globalStreamThread != null) {
+                globalStreamThread.shutdown();
+            }
 
-                if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-                    try {
-                        globalStreamThread.join();
-                    } catch (final InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                    globalStreamThread = null;
+            if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+                try {
+                    globalStreamThread.join();
+                } catch (final InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
+                globalStreamThread = null;
+            }
 
-                adminClient.close();
+            adminClient.close();
 
-                streamsMetrics.removeAllClientLevelMetrics();
-                metrics.close();
+            streamsMetrics.removeAllClientLevelMetrics();
+            metrics.close();
+            if (!error) {
                 setState(State.NOT_RUNNING);
-            }, "kafka-streams-close-thread");
+            }
+        }, "kafka-streams-close-thread");
+    }
+
+    private boolean close(final long timeoutMs) {
+        if (!setState(State.PENDING_SHUTDOWN)) {

Review comment:
       I do think that Error should not have direct transition. However I don't 
like using `PENDING_SHUTDOWN` , mostly because we can already distinguish 
between the two states and it would be best to inform right away. Also it could 
be a problem if we went to set Error and some how it went from PENDING_SHUTDOWN 
to NOT_RUNNING. I am in favor of adding something like `PENDING_ERROR` just to 
be more precise. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to