This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cf98b671d889f1003172bc656ded69d1aab9235
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Wed Jul 3 13:32:46 2019 +0200

    [FLINK-12804] Change mailbox implementation from bounded to unbounded
    
    This closes #8692.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |   2 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  37 +---
 .../runtime/tasks/mailbox/MailboxImpl.java         | 201 +++++----------------
 .../runtime/tasks/mailbox/MailboxSender.java       |  13 +-
 .../tasks/mailbox/execution/MailboxExecutor.java   |  17 +-
 .../execution/MailboxExecutorServiceImpl.java      |  20 --
 .../tasks/mailbox/execution/MailboxProcessor.java  |  56 ++----
 .../execution/SuspendedMailboxDefaultAction.java   |   2 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java     |  88 ++-------
 .../execution/MailboxExecutorServiceImplTest.java  |   6 +-
 .../mailbox/execution/MailboxProcessorTest.java    |  13 +-
 .../mailbox/execution/TestMailboxExecutor.java     |   6 -
 12 files changed, 92 insertions(+), 369 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a626bba..52e2011 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -484,7 +484,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        cancelTask();
                }
                finally {
-                       mailboxProcessor.cancelMailboxExecution();
+                       mailboxProcessor.allActionsCompleted();
                        cancelables.close();
                }
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
index 9b2d95f..deb69e0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
@@ -23,7 +23,7 @@ import javax.annotation.Nonnull;
 import java.util.List;
 
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
+ * A mailbox is basically a queue for inter-thread message exchange in form of 
{@link Runnable} objects between
  * multiple producer threads and a single consumer. This has a lifecycle of 
closed -> open -> (quiesced) -> closed.
  */
 public interface Mailbox extends MailboxReceiver, MailboxSender {
@@ -56,36 +56,12 @@ public interface Mailbox extends MailboxReceiver, 
MailboxSender {
        List<Runnable> close();
 
        /**
-        * The effect of this is that all pending letters in the mailbox are 
dropped and the given priorityLetter
-        * is enqueued to the head of the mailbox. Dropped letters are 
returned. This method should only be invoked
-        * by code that has ownership of the mailbox object and only rarely 
used, e.g. to submit special events like
-        * shutting down the mailbox loop.
-        *
-        * @param priorityLetter action to enqueue atomically after the mailbox 
was cleared.
-        * @throws MailboxStateException if the mailbox is quiesced or closed.
-        */
-       @Nonnull
-       List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
-
-       /**
-        * Adds the given action to the head of the mailbox. This method will 
block if the mailbox is full and
-        * should therefore only be called from outside the mailbox main-thread 
to avoid deadlocks.
-        *
-        * @param priorityLetter action to enqueue to the head of the mailbox.
-        * @throws InterruptedException on interruption.
-        * @throws MailboxStateException if the mailbox is quiesced or closed.
-        */
-       void putFirst(@Nonnull Runnable priorityLetter) throws 
InterruptedException, MailboxStateException;
-
-       /**
-        * Adds the given action to the head of the mailbox if the mailbox is 
not full. Returns true if the letter
-        * was successfully added to the mailbox.
+        * Adds the given action to the head of the mailbox.
         *
         * @param priorityLetter action to enqueue to the head of the mailbox.
-        * @return true if the letter was successfully added.
         * @throws MailboxStateException if the mailbox is quiesced or closed.
         */
-       boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
+       void putFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
 
        /**
         * Returns the current state of the mailbox as defined by the lifecycle 
enum {@link State}.
@@ -94,11 +70,4 @@ public interface Mailbox extends MailboxReceiver, 
MailboxSender {
         */
        @Nonnull
        State getState();
-
-       /**
-        * Returns the total capacity of the mailbox.
-        *
-        * @return the total capacity of the mailbox.
-        */
-       int capacity();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index 0f6bbf0..4f58c4a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -21,10 +21,13 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.locks.Condition;
@@ -38,42 +41,24 @@ import java.util.concurrent.locks.ReentrantLock;
 public class MailboxImpl implements Mailbox {
 
        /**
-        * The enqueued letters.
-        */
-       @GuardedBy("lock")
-       private final Runnable[] ringBuffer;
-
-       /**
         * Lock for all concurrent ops.
         */
        private final ReentrantLock lock;
 
        /**
-        * Condition that is triggered when the buffer is no longer empty.
-        */
-       @GuardedBy("lock")
-       private final Condition notEmpty;
-
-       /**
-        * Condition that is triggered when the buffer is no longer full.
+        * Internal queue of letters.
         */
        @GuardedBy("lock")
-       private final Condition notFull;
+       private final LinkedList<Runnable> queue;
 
        /**
-        * Index of the ring buffer head.
+        * Condition that is triggered when the mailbox is no longer empty.
         */
        @GuardedBy("lock")
-       private int headIndex;
-
-       /**
-        * Index of the ring buffer tail.
-        */
-       @GuardedBy("lock")
-       private int tailIndex;
+       private final Condition notEmpty;
 
        /**
-        * Number of letters in the mailbox.
+        * Number of letters in the mailbox. We track it separately from the 
queue#size to avoid locking on {@link #hasMail()}.
         */
        @GuardedBy("lock")
        private volatile int count;
@@ -84,24 +69,12 @@ public class MailboxImpl implements Mailbox {
        @GuardedBy("lock")
        private volatile State state;
 
-       /**
-        * A mask to wrap around the indexes of the ring buffer. We use this to 
avoid ifs or modulo ops.
-        */
-       private final int moduloMask;
-
        public MailboxImpl() {
-               this(6); // 2^6 = 64
-       }
-
-       public MailboxImpl(int capacityPow2) {
-               final int capacity = 1 << capacityPow2;
-               Preconditions.checkState(capacity > 0);
-               this.moduloMask = capacity - 1;
-               this.ringBuffer = new Runnable[capacity];
                this.lock = new ReentrantLock();
                this.notEmpty = lock.newCondition();
-               this.notFull = lock.newCondition();
                this.state = State.CLOSED;
+               this.queue = new LinkedList<>();
+               this.count = 0;
        }
 
        @Override
@@ -114,12 +87,7 @@ public class MailboxImpl implements Mailbox {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
-                       if (isEmpty()) {
-                               checkTakeStateConditions();
-                               return Optional.empty();
-                       } else {
-                               return Optional.of(takeInternal());
-                       }
+                       return Optional.ofNullable(takeHeadInternal());
                } finally {
                        lock.unlock();
                }
@@ -131,11 +99,11 @@ public class MailboxImpl implements Mailbox {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
-                       while (isEmpty()) {
-                               checkTakeStateConditions();
+                       Runnable headLetter;
+                       while ((headLetter = takeHeadInternal()) == null) {
                                notEmpty.await();
                        }
-                       return takeInternal();
+                       return headLetter;
                } finally {
                        lock.unlock();
                }
@@ -144,31 +112,10 @@ public class MailboxImpl implements Mailbox {
        
//------------------------------------------------------------------------------------------------------------------
 
        @Override
-       public boolean tryPutMail(@Nonnull Runnable letter) throws 
MailboxStateException {
+       public void putMail(@Nonnull Runnable letter) throws 
MailboxStateException {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
-                       if (isFull()) {
-                               checkPutStateConditions();
-                               return false;
-                       } else {
-                               putTailInternal(letter);
-                               return true;
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       @Override
-       public void putMail(@Nonnull Runnable letter) throws 
InterruptedException, MailboxStateException {
-               final ReentrantLock lock = this.lock;
-               lock.lockInterruptibly();
-               try {
-                       while (isFull()) {
-                               checkPutStateConditions();
-                               notFull.await();
-                       }
                        putTailInternal(letter);
                } finally {
                        lock.unlock();
@@ -177,51 +124,12 @@ public class MailboxImpl implements Mailbox {
 
        
//------------------------------------------------------------------------------------------------------------------
 
-       @Nonnull
-       @Override
-       public List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) 
throws MailboxStateException {
-               ArrayList<Runnable> droppedLetters = new 
ArrayList<>(capacity());
-
-               lock.lock();
-               try {
-                       // check state first to avoid loosing any letters 
forever through exception
-                       checkPutStateConditions();
-                       dropAllLetters(droppedLetters);
-                       putTailInternal(priorityLetter);
-               } finally {
-                       lock.unlock();
-               }
-
-               return droppedLetters;
-       }
-
-       @Override
-       public void putFirst(@Nonnull Runnable priorityLetter) throws 
InterruptedException, MailboxStateException {
-               final ReentrantLock lock = this.lock;
-               lock.lockInterruptibly();
-               try {
-                       while (isFull()) {
-                               checkPutStateConditions();
-                               notFull.await();
-                       }
-                       putHeadInternal(priorityLetter);
-               } finally {
-                       lock.unlock();
-               }
-       }
-
        @Override
-       public boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException {
+       public void putFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
-                       if (isFull()) {
-                               checkPutStateConditions();
-                               return false;
-                       } else {
-                               putHeadInternal(priorityLetter);
-                               return true;
-                       }
+                       putHeadInternal(priorityLetter);
                } finally {
                        lock.unlock();
                }
@@ -229,59 +137,42 @@ public class MailboxImpl implements Mailbox {
 
        
//------------------------------------------------------------------------------------------------------------------
 
-       private void putHeadInternal(Runnable letter) throws 
MailboxStateException {
+       private void putHeadInternal(Runnable newHead) throws 
MailboxStateException {
                assert lock.isHeldByCurrentThread();
                checkPutStateConditions();
-               headIndex = decreaseIndexWithWrapAround(headIndex);
-               this.ringBuffer[headIndex] = letter;
-               ++count;
+               queue.addFirst(newHead);
+               incrementCountAndCheckOverflow();
                notEmpty.signal();
        }
 
-       private void putTailInternal(Runnable letter) throws 
MailboxStateException {
+       private void putTailInternal(Runnable newTail) throws 
MailboxStateException {
                assert lock.isHeldByCurrentThread();
                checkPutStateConditions();
-               this.ringBuffer[tailIndex] = letter;
-               tailIndex = increaseIndexWithWrapAround(tailIndex);
-               ++count;
+               queue.addLast(newTail);
+               incrementCountAndCheckOverflow();
                notEmpty.signal();
        }
 
-       private Runnable takeInternal() throws MailboxStateException {
-               assert lock.isHeldByCurrentThread();
-               checkTakeStateConditions();
-               final Runnable[] buffer = this.ringBuffer;
-               Runnable letter = buffer[headIndex];
-               buffer[headIndex] = null;
-               headIndex = increaseIndexWithWrapAround(headIndex);
-               --count;
-               notFull.signal();
-               return letter;
+       private void incrementCountAndCheckOverflow() {
+               Preconditions.checkState(++count > 0, "Mailbox overflow.");
        }
 
-       private void dropAllLetters(List<Runnable> dropInto) {
+       @Nullable
+       private Runnable takeHeadInternal() throws MailboxStateException {
                assert lock.isHeldByCurrentThread();
-               int localCount = count;
-               while (localCount > 0) {
-                       dropInto.add(ringBuffer[headIndex]);
-                       ringBuffer[headIndex] = null;
-                       headIndex = increaseIndexWithWrapAround(headIndex);
-                       --localCount;
-                       notFull.signal();
+               checkTakeStateConditions();
+               Runnable oldHead = queue.pollFirst();
+               if (oldHead != null) {
+                       --count;
                }
-               count = 0;
-       }
-
-       private int increaseIndexWithWrapAround(int old) {
-               return (old + 1) & moduloMask;
-       }
-
-       private int decreaseIndexWithWrapAround(int old) {
-               return (old - 1) & moduloMask;
+               return oldHead;
        }
 
-       private boolean isFull() {
-               return count >= capacity();
+       private void drainAllLetters(List<Runnable> drainInto) {
+               assert lock.isHeldByCurrentThread();
+               drainInto.addAll(queue);
+               queue.clear();
+               count = 0;
        }
 
        private boolean isEmpty() {
@@ -331,7 +222,6 @@ public class MailboxImpl implements Mailbox {
                        if (state == State.OPEN) {
                                state = State.QUIESCED;
                        }
-                       notFull.signalAll();
                } finally {
                        lock.unlock();
                }
@@ -340,20 +230,20 @@ public class MailboxImpl implements Mailbox {
        @Nonnull
        @Override
        public List<Runnable> close() {
-               final ArrayList<Runnable> droppedLetters = new 
ArrayList<>(capacity());
-
                lock.lock();
                try {
-                       dropAllLetters(droppedLetters);
+                       if (state == State.CLOSED) {
+                               return Collections.emptyList();
+                       }
+                       ArrayList<Runnable> droppedLetters = new 
ArrayList<>(count);
+                       drainAllLetters(droppedLetters);
                        state = State.CLOSED;
                        // to unblock all
-                       notFull.signalAll();
                        notEmpty.signalAll();
+                       return droppedLetters;
                } finally {
                        lock.unlock();
                }
-
-               return droppedLetters;
        }
 
        @Nonnull
@@ -361,9 +251,4 @@ public class MailboxImpl implements Mailbox {
        public State getState() {
                return state;
        }
-
-       @Override
-       public int capacity() {
-               return ringBuffer.length;
-       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 2a2274a..0560726 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -27,21 +27,10 @@ import javax.annotation.Nonnull;
 public interface MailboxSender {
 
        /**
-        * Enqueues the given letter to the mailbox, if capacity is available. 
On success, this returns <code>true</code>
-        * and <code>false</code> if the mailbox was already full.
-        *
-        * @param letter the letter to enqueue.
-        * @return <code>true</code> iff successful.
-        * @throws MailboxStateException if the mailbox is quiesced or closed.
-        */
-       boolean tryPutMail(@Nonnull Runnable letter) throws 
MailboxStateException;
-
-       /**
         * Enqueues the given letter to the mailbox and blocks until there is 
capacity for a successful put.
         *
         * @param letter the letter to enqueue.
-        * @throws InterruptedException on interruption.
         * @throws MailboxStateException if the mailbox is quiesced or closed.
         */
-       void putMail(@Nonnull Runnable letter) throws InterruptedException,  
MailboxStateException;
+       void putMail(@Nonnull Runnable letter) throws  MailboxStateException;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
index 1848e77..eeca9c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
@@ -31,10 +31,7 @@ import java.util.concurrent.RejectedExecutionException;
 public interface MailboxExecutor extends Executor {
 
        /**
-        * Executes the given command at some time in the future in the mailbox 
thread. This call can block when the
-        * mailbox is currently full. Therefore, this method must not be called 
from the mailbox thread itself as this
-        * can cause a deadlock. Instead, if the caller is already in the 
mailbox thread, the command should just be
-        * executed directly or use the non-blocking {@link 
#tryExecute(Runnable)}.
+        * Executes the given command at some time in the future in the mailbox 
thread.
         *
         * @param command the runnable task to add to the mailbox for execution.
         * @throws RejectedExecutionException if this task cannot be accepted 
for execution, e.g. because the mailbox is
@@ -44,18 +41,6 @@ public interface MailboxExecutor extends Executor {
        void execute(@Nonnull Runnable command) throws 
RejectedExecutionException;
 
        /**
-        * Attempts to enqueue the given command in the mailbox for execution. 
On success, the method returns true. If
-        * the mailbox is full, this method returns immediately without adding 
the command and returns false.
-        *
-        * @param command the runnable task to add to the mailbox for execution.
-        * @return true if the command was added to the mailbox. False if the 
command could not be added because the mailbox
-        * was full.
-        * @throws RejectedExecutionException if this task cannot be accepted 
for execution, e.g. because the mailbox is
-        *                                    quiesced or closed.
-        */
-       boolean tryExecute(Runnable command) throws RejectedExecutionException;
-
-       /**
         * This methods starts running the command at the head of the mailbox 
and is intended to be used by the mailbox
         * thread to yield from a currently ongoing action to another command. 
The method blocks until another command to
         * run is available in the mailbox and must only be called from the 
mailbox thread. Must only be called from the
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
index 944b7a9..5c3cd95 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
@@ -53,27 +53,14 @@ public class MailboxExecutorServiceImpl extends 
AbstractExecutorService implemen
 
        @Override
        public void execute(@Nonnull Runnable command) {
-               checkIsNotMailboxThread();
                try {
                        mailbox.putMail(command);
-               } catch (InterruptedException irex) {
-                       Thread.currentThread().interrupt();
-                       throw new RejectedExecutionException("Sender thread was 
interrupted while blocking on mailbox.", irex);
                } catch (MailboxStateException mbex) {
                        throw new RejectedExecutionException(mbex);
                }
        }
 
        @Override
-       public boolean tryExecute(Runnable command) {
-               try {
-                       return mailbox.tryPutMail(command);
-               } catch (MailboxStateException e) {
-                       throw new RejectedExecutionException(e);
-               }
-       }
-
-       @Override
        public void yield() throws InterruptedException, IllegalStateException {
                checkIsMailboxThread();
                try {
@@ -147,11 +134,4 @@ public class MailboxExecutorServiceImpl extends 
AbstractExecutorService implemen
                                "Illegal thread detected. This method must be 
called from inside the mailbox thread!");
                }
        }
-
-       private void checkIsNotMailboxThread() {
-               if (isMailboxThread()) {
-                       throw new IllegalStateException(
-                               "Illegal thread detected. This method must NOT 
be called from inside the mailbox thread!");
-               }
-       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
index 77906f7..6fc8da0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
@@ -114,7 +114,11 @@ public class MailboxProcessor {
         * {@link java.util.concurrent.RunnableFuture} that are still contained 
in the mailbox.
         */
        public void close() {
-               
FutureUtils.cancelRunnableFutures(mailboxExecutor.shutdownNow());
+               List<Runnable> droppedLetters = mailboxExecutor.shutdownNow();
+               if (!droppedLetters.isEmpty()) {
+                       LOG.debug("Closing the mailbox dropped letters {}.", 
droppedLetters);
+                       FutureUtils.cancelRunnableFutures(droppedLetters);
+               }
        }
 
        /**
@@ -138,19 +142,11 @@ public class MailboxProcessor {
        }
 
        /**
-        * Cancels the mailbox loop execution. All pending mailbox actions will 
not be executed anymore, if they are
-        * instance of {@link java.util.concurrent.RunnableFuture}, they will 
be cancelled.
-        */
-       public void cancelMailboxExecution() {
-               clearMailboxAndRunPriorityAction(mailboxPoisonLetter);
-       }
-
-       /**
         * Reports a throwable for rethrowing from the mailbox thread. This 
will clear and cancel all other pending letters.
         * @param throwable to report by rethrowing from the mailbox loop.
         */
        public void reportThrowable(Throwable throwable) {
-               clearMailboxAndRunPriorityAction(() -> {
+               sendPriorityLetter(() -> {
                        throw new WrappingRuntimeException(throwable);
                });
        }
@@ -159,17 +155,14 @@ public class MailboxProcessor {
         * This method must be called to end the stream task when all actions 
for the tasks have been performed.
         */
        public void allActionsCompleted() {
+               sendPriorityLetter(mailboxPoisonLetter);
+       }
+
+       private void sendPriorityLetter(Runnable priorityLetter) {
                try {
-                       if (mailboxExecutor.isMailboxThread()) {
-                               mailboxLoopRunning = false;
-                               ensureControlFlowSignalCheck();
-                       } else {
-                               mailbox.putFirst(mailboxPoisonLetter);
-                       }
-               } catch (InterruptedException e) {
-                       Thread.currentThread().interrupt();
+                       mailbox.putFirst(priorityLetter);
                } catch (MailboxStateException me) {
-                       LOG.debug("Action context could not submit poison 
letter to mailbox.", me);
+                       LOG.debug("Action context could not submit priority 
letter to mailbox.", me);
                }
        }
 
@@ -236,20 +229,7 @@ public class MailboxProcessor {
        private void ensureControlFlowSignalCheck() {
                // Make sure that mailbox#hasMail is true via a dummy letter so 
that the flag change is noticed.
                if (!mailbox.hasMail()) {
-                       try {
-                               mailbox.tryPutMail(() -> {});
-                       } catch (MailboxStateException me) {
-                               LOG.debug("Mailbox closed when trying to submit 
letter for control flow signal.", me);
-                       }
-               }
-       }
-
-       private void clearMailboxAndRunPriorityAction(Runnable priorityLetter) {
-               try {
-                       List<Runnable> droppedRunnables = 
mailbox.clearAndPut(priorityLetter);
-                       FutureUtils.cancelRunnableFutures(droppedRunnables);
-               } catch (MailboxStateException msex) {
-                       LOG.debug("Mailbox already closed in cancel().", msex);
+                       sendPriorityLetter(() -> {});
                }
        }
 
@@ -283,10 +263,14 @@ public class MailboxProcessor {
 
                @Override
                public void resume() {
-                       Preconditions.checkState(
-                               mailboxExecutor.isMailboxThread(),
-                               "SuspendedMailboxDefaultAction::resume resume 
must only be called from the mailbox-thread!");
+                       if (mailboxExecutor.isMailboxThread()) {
+                               resumeInternal();
+                       } else {
+                               sendPriorityLetter(this::resumeInternal);
+                       }
+               }
 
+               private void resumeInternal() {
                        if (suspendedDefaultAction == this) {
                                suspendedDefaultAction = null;
                        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
index ca31254..6b3f32c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
@@ -24,7 +24,7 @@ package 
org.apache.flink.streaming.runtime.tasks.mailbox.execution;
 public interface SuspendedMailboxDefaultAction {
 
        /**
-        * Resume execution of the default action. Must only be called from the 
mailbox thread!.
+        * Resume execution of the default action.
         */
        void resume();
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index 1a48136..b7b478e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks.mailbox;
 
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.RunnableWithException;
@@ -30,7 +29,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
@@ -42,9 +40,6 @@ import java.util.function.Consumer;
 public class MailboxImplTest {
 
        private static final Runnable POISON_LETTER = () -> {};
-       private static final int CAPACITY_POW_2 = 2;
-       private static final int CAPACITY = 1 << CAPACITY_POW_2;
-
        /**
         * Object under test.
         */
@@ -52,7 +47,7 @@ public class MailboxImplTest {
 
        @Before
        public void setUp() {
-               mailbox = new MailboxImpl(CAPACITY_POW_2);
+               mailbox = new MailboxImpl();
                mailbox.open();
        }
 
@@ -61,26 +56,6 @@ public class MailboxImplTest {
                mailbox.close();
        }
 
-       /**
-        * Test for #clearAndPut should remove other pending events and enqueue 
directly to the head of the mailbox queue.
-        */
-       @Test
-       public void testClearAndPut() throws Exception {
-
-               Runnable letterInstance = () -> {};
-
-               for (int i = 0; i < CAPACITY; ++i) {
-                       Assert.assertTrue(mailbox.tryPutMail(letterInstance));
-               }
-
-               List<Runnable> droppedLetters = 
mailbox.clearAndPut(POISON_LETTER);
-
-               Assert.assertTrue(mailbox.hasMail());
-               Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get());
-               Assert.assertFalse(mailbox.hasMail());
-               Assert.assertEquals(CAPACITY, droppedLetters.size());
-       }
-
        @Test
        public void testPutAsHead() throws Exception {
 
@@ -88,38 +63,16 @@ public class MailboxImplTest {
                Runnable instanceB = () -> {};
                Runnable instanceC = () -> {};
                Runnable instanceD = () -> {};
-               Runnable instanceE = () -> {};
 
+               mailbox.putMail(instanceC);
+               mailbox.putFirst(instanceB);
                mailbox.putMail(instanceD);
-               mailbox.tryPutFirst(instanceC);
-               mailbox.putMail(instanceE);
                mailbox.putFirst(instanceA);
 
-               OneShotLatch latch = new OneShotLatch();
-               Thread blockingPut = new Thread(() -> {
-                       // ensure we are full
-                       try {
-                               if (!mailbox.tryPutFirst(() -> { })) {
-                                       latch.trigger();
-
-                                       mailbox.putFirst(instanceB);
-
-                               }
-                       } catch (InterruptedException e) {
-                               Thread.currentThread().interrupt();
-                       } catch (MailboxStateException ignore) {
-                       }
-               });
-
-               blockingPut.start();
-               latch.await();
-
                Assert.assertSame(instanceA, mailbox.takeMail());
-               blockingPut.join();
                Assert.assertSame(instanceB, mailbox.takeMail());
                Assert.assertSame(instanceC, mailbox.takeMail());
                Assert.assertSame(instanceD, mailbox.takeMail());
-               Assert.assertSame(instanceE, mailbox.takeMail());
 
                Assert.assertFalse(mailbox.tryTakeMail().isPresent());
        }
@@ -129,15 +82,13 @@ public class MailboxImplTest {
                final Queue<Runnable> testObjects = new LinkedList<>();
                Assert.assertFalse(mailbox.hasMail());
 
-               for (int i = 0; i < CAPACITY; ++i) {
+               for (int i = 0; i < 10; ++i) {
                        Runnable letter = () -> {};
                        testObjects.add(letter);
-                       Assert.assertTrue(mailbox.tryPutMail(letter));
+                       mailbox.putMail(letter);
                        Assert.assertTrue(mailbox.hasMail());
                }
 
-               Assert.assertFalse(mailbox.tryPutMail(() -> {}));
-
                while (!testObjects.isEmpty()) {
                        Assert.assertEquals(testObjects.remove(), 
mailbox.tryTakeMail().get());
                        Assert.assertEquals(!testObjects.isEmpty(), 
mailbox.hasMail());
@@ -164,9 +115,7 @@ public class MailboxImplTest {
                                }
                                return optionalLetter.get();
                        }),
-                       ((mailbox, runnable) -> {
-                               while (!mailbox.tryPutMail(runnable)) {}
-                       }));
+                       MailboxSender::putMail);
        }
 
        /**
@@ -176,7 +125,7 @@ public class MailboxImplTest {
        public void testCloseUnblocks() throws InterruptedException {
                testAllPuttingUnblocksInternal(Mailbox::close);
                setUp();
-               testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close, 
MailboxStateException.class);
+               testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close);
        }
 
        /**
@@ -218,16 +167,6 @@ public class MailboxImplTest {
 
        private void testLifecyclePuttingInternal() throws Exception {
                try {
-                       mailbox.tryPutMail(() -> {});
-                       Assert.fail();
-               } catch (MailboxStateException ignore) {
-               }
-               try {
-                       mailbox.tryPutFirst(() -> {});
-                       Assert.fail();
-               } catch (MailboxStateException ignore) {
-               }
-               try {
                        mailbox.putMail(() -> {});
                        Assert.fail();
                } catch (MailboxStateException ignore) {
@@ -240,18 +179,15 @@ public class MailboxImplTest {
        }
 
        private void testAllPuttingUnblocksInternal(Consumer<Mailbox> 
unblockMethod) throws InterruptedException {
-               testUnblocksInternal(() -> mailbox.putMail(() -> {}), 
unblockMethod, MailboxStateException.class);
-               setUp();
-               testUnblocksInternal(() -> mailbox.putFirst(() -> {}), 
unblockMethod, MailboxStateException.class);
+               testUnblocksInternal(() -> mailbox.putMail(() -> {}), 
unblockMethod);
                setUp();
-               testUnblocksInternal(() -> mailbox.clearAndPut(() -> {}), 
unblockMethod, MailboxStateException.class);
+               testUnblocksInternal(() -> mailbox.putFirst(() -> {}), 
unblockMethod);
        }
 
        private void testUnblocksInternal(
                RunnableWithException testMethod,
-               Consumer<Mailbox> unblockMethod,
-               Class<?> expectedExceptionClass) throws InterruptedException {
-               final Thread[] blockedThreads = new Thread[CAPACITY * 2];
+               Consumer<Mailbox> unblockMethod) throws InterruptedException {
+               final Thread[] blockedThreads = new Thread[8];
                final Exception[] exceptions = new 
Exception[blockedThreads.length];
 
                CountDownLatch countDownLatch = new 
CountDownLatch(blockedThreads.length);
@@ -280,7 +216,7 @@ public class MailboxImplTest {
                }
 
                for (Exception exception : exceptions) {
-                       Assert.assertEquals(expectedExceptionClass, 
exception.getClass());
+                       Assert.assertEquals(MailboxStateException.class, 
exception.getClass());
                }
 
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
index 92e106d..7c25e5f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
@@ -75,13 +75,13 @@ public class MailboxExecutorServiceImplTest {
                Assert.assertFalse(mailboxExecutorService.isShutdown());
                Assert.assertFalse(mailboxExecutorService.isTerminated());
                final TestRunnable testRunnable = new TestRunnable();
-               
Assert.assertTrue(mailboxExecutorService.tryExecute(testRunnable));
+               mailboxExecutorService.execute(testRunnable);
                Assert.assertEquals(testRunnable, mailbox.tryTakeMail().get());
                CompletableFuture.runAsync(() -> 
mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
                Assert.assertEquals(testRunnable, mailbox.takeMail());
                final TestRunnable yieldRun = new TestRunnable();
                final TestRunnable leftoverRun = new TestRunnable();
-               Assert.assertTrue(mailboxExecutorService.tryExecute(yieldRun));
+               mailboxExecutorService.execute(yieldRun);
                Future<?> leftoverFuture = CompletableFuture.supplyAsync(
                        () -> mailboxExecutorService.submit(leftoverRun), 
otherThreadExecutor).get();
                mailboxExecutorService.shutdown();
@@ -96,7 +96,7 @@ public class MailboxExecutorServiceImplTest {
                }
 
                try {
-                       CompletableFuture.runAsync(() -> 
mailboxExecutorService.tryExecute(testRunnable), otherThreadExecutor).get();
+                       CompletableFuture.runAsync(() -> 
mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
                        Assert.fail("execution should not work after 
shutdown().");
                } catch (ExecutionException expected) {
                        Assert.assertTrue(expected.getCause() instanceof 
RejectedExecutionException);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
index 80c71dd..46d5231 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
@@ -38,7 +38,7 @@ public class MailboxProcessorTest {
        public void testRejectIfNotOpen() {
                MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) 
-> {});
                try {
-                       mailboxProcessor.getMailboxExecutor().tryExecute(() -> 
{});
+                       mailboxProcessor.getMailboxExecutor().execute(() -> {});
                        Assert.fail("Should not be able to accept runnables if 
not opened.");
                } catch (RejectedExecutionException expected) {
                }
@@ -49,11 +49,11 @@ public class MailboxProcessorTest {
                MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) 
-> {});
                FutureTask<Void> testRunnableFuture = new FutureTask<>(() -> 
{}, null);
                mailboxProcessor.open();
-               
mailboxProcessor.getMailboxExecutor().tryExecute(testRunnableFuture);
+               
mailboxProcessor.getMailboxExecutor().execute(testRunnableFuture);
                mailboxProcessor.prepareClose();
 
                try {
-                       mailboxProcessor.getMailboxExecutor().tryExecute(() -> 
{});
+                       mailboxProcessor.getMailboxExecutor().execute(() -> {});
                        Assert.fail("Should not be able to accept runnables if 
not opened.");
                } catch (RejectedExecutionException expected) {
                }
@@ -123,11 +123,12 @@ public class MailboxProcessorTest {
                        }
                };
 
-               start(mailboxThread);
+               MailboxProcessor mailboxProcessor = start(mailboxThread);
                actionSuspendedLatch.await();
                Assert.assertEquals(blockAfterInvocations, counter.get());
 
-               suspendedActionRef.get().resume();
+               SuspendedMailboxDefaultAction suspendedMailboxDefaultAction = 
suspendedActionRef.get();
+               
mailboxProcessor.getMailboxExecutor().execute(suspendedMailboxDefaultAction::resume);
                stop(mailboxThread);
                Assert.assertEquals(totalInvocations, counter.get());
        }
@@ -170,7 +171,7 @@ public class MailboxProcessorTest {
                                final SuspendedMailboxDefaultAction resume =
                                        suspendedActionRef.getAndSet(null);
                                if (resume != null) {
-                                       resume.resume();
+                                       
mailboxProcessor.getMailboxExecutor().execute(resume::resume);
                                } else {
                                        try {
                                                
mailboxProcessor.getMailboxExecutor().execute(() -> { });
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
index 31126a8..979fe58 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
@@ -46,12 +46,6 @@ public class TestMailboxExecutor implements MailboxExecutor {
        }
 
        @Override
-       public boolean tryExecute(Runnable command) {
-               execute(command);
-               return true;
-       }
-
-       @Override
        public void yield() throws InterruptedException {
                synchronized (lock) {
                        lock.wait(1);

Reply via email to