[ 
https://issues.apache.org/jira/browse/KAFKA-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643933#comment-16643933
 ] 

ASF GitHub Bot commented on KAFKA-7477:
---------------------------------------

mjsax closed pull request #5747: KAFKA-7477: Improve Streams close timeout 
semantics
URL: https://github.com/apache/kafka/pull/5747
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5fb89598507..d419ff50870 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -218,13 +218,7 @@ private boolean waitOnState(final State targetState, final 
long waitMs) {
         synchronized (stateLock) {
             long elapsedMs = 0L;
             while (state != targetState) {
-                if (waitMs == 0) {
-                    try {
-                        stateLock.wait();
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
-                    }
-                } else if (waitMs > elapsedMs) {
+                if (waitMs > elapsedMs) {
                     final long remainingMs = waitMs - elapsedMs;
                     try {
                         stateLock.wait(remainingMs);
@@ -824,17 +818,30 @@ public void close() {
      * threads to join.
      * A {@code timeout} of 0 means to wait forever.
      *
-     * @param timeout  how long to wait for the threads to shutdown
+     * @param timeout  how long to wait for the threads to shutdown. Can't be 
negative. If {@code timeout=0} just checking the state and return immediately.
      * @param timeUnit unit of time used for timeout
      * @return {@code true} if all threads were successfully 
stopped—{@code false} if the timeout was reached
      * before all threads stopped
      * Note that this method must not be called in the {@code onChange} 
callback of {@link StateListener}.
-     * @deprecated Use {@link #close(Duration)} instead
+     * @deprecated Use {@link #close(Duration)} instead; note, that {@link 
#close(Duration)} has different semantics and does not block on zero, e.g., 
`Duration.ofMillis(0)`.
      */
     @Deprecated
     public synchronized boolean close(final long timeout, final TimeUnit 
timeUnit) {
-        log.debug("Stopping Streams client with timeoutMillis = {} ms.", 
timeUnit.toMillis(timeout));
+        long timeoutMs = timeUnit.toMillis(timeout);
+
+        log.debug("Stopping Streams client with timeoutMillis = {} ms. You are 
using deprecated method. " +
+            "Please, consider update your code.", timeoutMs);
+
+        if (timeoutMs < 0) {
+            timeoutMs = 0;
+        } else if (timeoutMs == 0) {
+            timeoutMs = Long.MAX_VALUE;
+        }
+
+        return close(timeoutMs);
+    }
 
+    private boolean close(final long timeoutMs) {
         if (!setState(State.PENDING_SHUTDOWN)) {
             // if transition failed, it means it was either in PENDING_SHUTDOWN
             // or NOT_RUNNING already; just check that all threads have been 
stopped
@@ -890,7 +897,7 @@ public void run() {
             shutdownThread.start();
         }
 
-        if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
+        if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
             log.info("Streams client stopped completely");
             return true;
         } else {
@@ -912,7 +919,15 @@ public void run() {
      */
     public synchronized boolean close(final Duration timeout) throws 
IllegalArgumentException {
         ApiUtils.validateMillisecondDuration(timeout, "timeout");
-        return close(timeout.toMillis(), TimeUnit.MILLISECONDS);
+
+        final long timeoutMs = timeout.toMillis();
+        if (timeoutMs < 0) {
+            throw new IllegalArgumentException("Timeout can't be negative.");
+        }
+
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", 
timeoutMs);
+
+        return close(timeoutMs);
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index abc4cb90b7d..b9d542bc9b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws 
InterruptedException {
         }
     }
 
+    @Test
+    public void shouldThrowOnNegativeTimeoutForClose() {
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        try {
+            streams.close(Duration.ofMillis(-1L));
+            fail("should not accept negative close parameter");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void shouldNotBlockInCloseForZeroDuration() throws 
InterruptedException {
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        final Thread th = new Thread(() -> 
streams.close(Duration.ofMillis(0L)));
+
+        th.start();
+
+        try {
+            th.join(30_000L);
+            assertFalse(th.isAlive());
+        } finally {
+            streams.close();
+        }
+    }
+
     private void verifyCleanupStateDir(final String appDir, final File 
oldTaskDir) throws InterruptedException {
         final File taskDir = new File(appDir, "0_0");
         TestUtils.waitForCondition(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Streams close timeout semantics
> ---------------------------------------
>
>                 Key: KAFKA-7477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7477
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: kip, newbie
>
> See [https://github.com/apache/kafka/pull/5682#discussion_r221473451]
> The current timeout semantics are a little "magical":
>  * 0 means to block forever
>  * negative numbers cause the close to complete immediately without checking 
> the state
> I think this would make more sense:
>  * reject negative numbers
>  * make 0 just signal and return immediately (after checking the state once)
>  * if I want to wait "forever", I can use {{ofYears(1)}} or 
> {{ofMillis(Long.MAX_VALUE)}} or some other intuitively "long enough to be 
> forever" value instead of a magic value.
>  
> Part of 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to