This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit acfbc83cf17f94a27c1aa71c331ee95382f55596 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Wed Jun 26 12:21:33 2019 +0200 [hotfix] Remove dangerous waiting methods from mailbox --- .../flink/streaming/runtime/tasks/StreamTask.java | 2 +- .../runtime/tasks/mailbox/MailboxImpl.java | 26 ------------------- .../runtime/tasks/mailbox/MailboxReceiver.java | 6 ----- .../runtime/tasks/mailbox/MailboxSender.java | 7 ------ .../runtime/tasks/mailbox/MailboxImplTest.java | 29 ++++++---------------- 5 files changed, 8 insertions(+), 62 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 2f4dd6a..8ba2a44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -1339,7 +1339,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> */ public final class ActionContext { - private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(mailbox::waitUntilHasMail); + private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(() -> mailbox.takeMail().run()); /** * This method must be called to end the stream task when all actions for the tasks have been performed. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java index 411efd1..e9bd346 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java @@ -126,19 +126,6 @@ public class MailboxImpl implements Mailbox { } } - @Override - public void waitUntilHasMail() throws InterruptedException { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (isEmpty()) { - notEmpty.await(); - } - } finally { - lock.unlock(); - } - } - //------------------------------------------------------------------------------------------------------------------ @Override @@ -171,19 +158,6 @@ public class MailboxImpl implements Mailbox { } } - @Override - public void waitUntilHasCapacity() throws InterruptedException { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (isFull()) { - notFull.await(); - } - } finally { - lock.unlock(); - } - } - //------------------------------------------------------------------------------------------------------------------ private void putInternal(Runnable letter) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java index 189687e..2d2f112 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java @@ -50,10 +50,4 @@ public interface MailboxReceiver { */ @Nonnull Runnable takeMail() throws InterruptedException; - - /** - * This method blocks if the mailbox is empty until mail becomes available. - * @throws InterruptedException on interruption. - */ - void waitUntilHasMail() throws InterruptedException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java index 1829125..36d10a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java @@ -42,11 +42,4 @@ public interface MailboxSender { * @throws InterruptedException on interruption. */ void putMail(@Nonnull Runnable letter) throws InterruptedException; - - /** - * This method blocks until the mailbox has again capacity to enqueue new letters. - * - * @throws InterruptedException on interruption. - */ - void waitUntilHasCapacity() throws InterruptedException; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java index fc7f19c..9c4edbf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; import java.util.LinkedList; +import java.util.Optional; import java.util.Queue; /** @@ -81,24 +82,7 @@ public class MailboxImplTest { while (!testObjects.isEmpty()) { Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get()); Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail()); - mailbox.waitUntilHasCapacity(); // should not block here because the mailbox is not full } - - Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasMail())); - waitingReader.start(); - Thread.sleep(1); - Assert.assertTrue(waitingReader.isAlive()); - mailbox.tryPutMail(() -> {}); - waitingReader.join(); // should complete here - - while (mailbox.tryPutMail(() -> {})) {} - - Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasCapacity())); - waitingWriter.start(); - Thread.sleep(1); - Assert.assertTrue(waitingWriter.isAlive()); - mailbox.takeMail(); - waitingWriter.join(); } /** @@ -115,13 +99,14 @@ public class MailboxImplTest { @Test public void testConcurrentPutTakeNonBlockingAndWait() throws Exception { testPutTake((mailbox -> { - mailbox.waitUntilHasMail(); - return mailbox.tryTakeMail().get(); + Optional<Runnable> optionalLetter = mailbox.tryTakeMail(); + while (!optionalLetter.isPresent()) { + optionalLetter = mailbox.tryTakeMail(); + } + return optionalLetter.get(); }), ((mailbox, runnable) -> { - while (!mailbox.tryPutMail(runnable)) { - mailbox.waitUntilHasCapacity(); - } + while (!mailbox.tryPutMail(runnable)) {} })); }