This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acfbc83cf17f94a27c1aa71c331ee95382f55596
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Wed Jun 26 12:21:33 2019 +0200

    [hotfix] Remove dangerous waiting methods from mailbox
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/tasks/mailbox/MailboxImpl.java         | 26 -------------------
 .../runtime/tasks/mailbox/MailboxReceiver.java     |  6 -----
 .../runtime/tasks/mailbox/MailboxSender.java       |  7 ------
 .../runtime/tasks/mailbox/MailboxImplTest.java     | 29 ++++++----------------
 5 files changed, 8 insertions(+), 62 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2f4dd6a..8ba2a44 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1339,7 +1339,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         */
        public final class ActionContext {
 
-               private final Runnable actionUnavailableLetter = 
ThrowingRunnable.unchecked(mailbox::waitUntilHasMail);
+               private final Runnable actionUnavailableLetter = 
ThrowingRunnable.unchecked(() -> mailbox.takeMail().run());
 
                /**
                 * This method must be called to end the stream task when all 
actions for the tasks have been performed.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index 411efd1..e9bd346 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -126,19 +126,6 @@ public class MailboxImpl implements Mailbox {
                }
        }
 
-       @Override
-       public void waitUntilHasMail() throws InterruptedException {
-               final ReentrantLock lock = this.lock;
-               lock.lockInterruptibly();
-               try {
-                       while (isEmpty()) {
-                               notEmpty.await();
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
        
//------------------------------------------------------------------------------------------------------------------
 
        @Override
@@ -171,19 +158,6 @@ public class MailboxImpl implements Mailbox {
                }
        }
 
-       @Override
-       public void waitUntilHasCapacity() throws InterruptedException {
-               final ReentrantLock lock = this.lock;
-               lock.lockInterruptibly();
-               try {
-                       while (isFull()) {
-                               notFull.await();
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
        
//------------------------------------------------------------------------------------------------------------------
 
        private void putInternal(Runnable letter) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
index 189687e..2d2f112 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
@@ -50,10 +50,4 @@ public interface MailboxReceiver {
         */
        @Nonnull
        Runnable takeMail() throws InterruptedException;
-
-       /**
-        * This method blocks if the mailbox is empty until mail becomes 
available.
-        * @throws InterruptedException on interruption.
-        */
-       void waitUntilHasMail() throws InterruptedException;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 1829125..36d10a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -42,11 +42,4 @@ public interface MailboxSender {
         * @throws InterruptedException on interruption.
         */
        void putMail(@Nonnull Runnable letter) throws InterruptedException;
-
-       /**
-        * This method blocks until the mailbox has again capacity to enqueue 
new letters.
-        *
-        * @throws InterruptedException on interruption.
-        */
-       void waitUntilHasCapacity() throws InterruptedException;
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index fc7f19c..9c4edbf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Queue;
 
 /**
@@ -81,24 +82,7 @@ public class MailboxImplTest {
                while (!testObjects.isEmpty()) {
                        Assert.assertEquals(testObjects.remove(), 
mailbox.tryTakeMail().get());
                        Assert.assertEquals(!testObjects.isEmpty(), 
mailbox.hasMail());
-                       mailbox.waitUntilHasCapacity(); // should not block 
here because the mailbox is not full
                }
-
-               Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() 
-> mailbox.waitUntilHasMail()));
-               waitingReader.start();
-               Thread.sleep(1);
-               Assert.assertTrue(waitingReader.isAlive());
-               mailbox.tryPutMail(() -> {});
-               waitingReader.join(); // should complete here
-
-               while (mailbox.tryPutMail(() -> {})) {}
-
-               Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() 
-> mailbox.waitUntilHasCapacity()));
-               waitingWriter.start();
-               Thread.sleep(1);
-               Assert.assertTrue(waitingWriter.isAlive());
-               mailbox.takeMail();
-               waitingWriter.join();
        }
 
        /**
@@ -115,13 +99,14 @@ public class MailboxImplTest {
        @Test
        public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
                testPutTake((mailbox -> {
-                               mailbox.waitUntilHasMail();
-                               return mailbox.tryTakeMail().get();
+                               Optional<Runnable> optionalLetter = 
mailbox.tryTakeMail();
+                               while (!optionalLetter.isPresent()) {
+                                       optionalLetter = mailbox.tryTakeMail();
+                               }
+                               return optionalLetter.get();
                        }),
                        ((mailbox, runnable) -> {
-                               while (!mailbox.tryPutMail(runnable)) {
-                                       mailbox.waitUntilHasCapacity();
-                               }
+                               while (!mailbox.tryPutMail(runnable)) {}
                        }));
        }
 

Reply via email to