[ https://issues.apache.org/jira/browse/KAFKA-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643933#comment-16643933 ]
ASF GitHub Bot commented on KAFKA-7477: --------------------------------------- mjsax closed pull request #5747: KAFKA-7477: Improve Streams close timeout semantics URL: https://github.com/apache/kafka/pull/5747 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 5fb89598507..d419ff50870 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -218,13 +218,7 @@ private boolean waitOnState(final State targetState, final long waitMs) { synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { - if (waitMs == 0) { - try { - stateLock.wait(); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration - } - } else if (waitMs > elapsedMs) { + if (waitMs > elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); @@ -824,17 +818,30 @@ public void close() { * threads to join. * A {@code timeout} of 0 means to wait forever. * - * @param timeout how long to wait for the threads to shutdown + * @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately. * @param timeUnit unit of time used for timeout * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached * before all threads stopped * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}. - * @deprecated Use {@link #close(Duration)} instead + * @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`. */ @Deprecated public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); + long timeoutMs = timeUnit.toMillis(timeout); + + log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " + + "Please, consider update your code.", timeoutMs); + + if (timeoutMs < 0) { + timeoutMs = 0; + } else if (timeoutMs == 0) { + timeoutMs = Long.MAX_VALUE; + } + + return close(timeoutMs); + } + private boolean close(final long timeoutMs) { if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped @@ -890,7 +897,7 @@ public void run() { shutdownThread.start(); } - if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) { + if (waitOnState(State.NOT_RUNNING, timeoutMs)) { log.info("Streams client stopped completely"); return true; } else { @@ -912,7 +919,15 @@ public void run() { */ public synchronized boolean close(final Duration timeout) throws IllegalArgumentException { ApiUtils.validateMillisecondDuration(timeout, "timeout"); - return close(timeout.toMillis(), TimeUnit.MILLISECONDS); + + final long timeoutMs = timeout.toMillis(); + if (timeoutMs < 0) { + throw new IllegalArgumentException("Timeout can't be negative."); + } + + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); + + return close(timeoutMs); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index abc4cb90b7d..b9d542bc9b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws InterruptedException { } } + @Test + public void shouldThrowOnNegativeTimeoutForClose() { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + try { + streams.close(Duration.ofMillis(-1L)); + fail("should not accept negative close parameter"); + } catch (final IllegalArgumentException e) { + // expected + } finally { + streams.close(); + } + } + + @Test + public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L))); + + th.start(); + + try { + th.join(30_000L); + assertFalse(th.isAlive()); + } finally { + streams.close(); + } + } + private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException { final File taskDir = new File(appDir, "0_0"); TestUtils.waitForCondition( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve Streams close timeout semantics > --------------------------------------- > > Key: KAFKA-7477 > URL: https://issues.apache.org/jira/browse/KAFKA-7477 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: Nikolay Izhikov > Priority: Minor > Labels: kip, newbie > > See [https://github.com/apache/kafka/pull/5682#discussion_r221473451] > The current timeout semantics are a little "magical": > * 0 means to block forever > * negative numbers cause the close to complete immediately without checking > the state > I think this would make more sense: > * reject negative numbers > * make 0 just signal and return immediately (after checking the state once) > * if I want to wait "forever", I can use {{ofYears(1)}} or > {{ofMillis(Long.MAX_VALUE)}} or some other intuitively "long enough to be > forever" value instead of a magic value. > > Part of > https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times -- This message was sent by Atlassian JIRA (v7.6.3#76005)