This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cf98b671d889f1003172bc656ded69d1aab9235 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Wed Jul 3 13:32:46 2019 +0200 [FLINK-12804] Change mailbox implementation from bounded to unbounded This closes #8692. --- .../flink/streaming/runtime/tasks/StreamTask.java | 2 +- .../streaming/runtime/tasks/mailbox/Mailbox.java | 37 +--- .../runtime/tasks/mailbox/MailboxImpl.java | 201 +++++---------------- .../runtime/tasks/mailbox/MailboxSender.java | 13 +- .../tasks/mailbox/execution/MailboxExecutor.java | 17 +- .../execution/MailboxExecutorServiceImpl.java | 20 -- .../tasks/mailbox/execution/MailboxProcessor.java | 56 ++---- .../execution/SuspendedMailboxDefaultAction.java | 2 +- .../runtime/tasks/mailbox/MailboxImplTest.java | 88 ++------- .../execution/MailboxExecutorServiceImplTest.java | 6 +- .../mailbox/execution/MailboxProcessorTest.java | 13 +- .../mailbox/execution/TestMailboxExecutor.java | 6 - 12 files changed, 92 insertions(+), 369 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a626bba..52e2011 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -484,7 +484,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> cancelTask(); } finally { - mailboxProcessor.cancelMailboxExecution(); + mailboxProcessor.allActionsCompleted(); cancelables.close(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java index 9b2d95f..deb69e0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java @@ -23,7 +23,7 @@ import javax.annotation.Nonnull; import java.util.List; /** - * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between + * A mailbox is basically a queue for inter-thread message exchange in form of {@link Runnable} objects between * multiple producer threads and a single consumer. This has a lifecycle of closed -> open -> (quiesced) -> closed. */ public interface Mailbox extends MailboxReceiver, MailboxSender { @@ -56,36 +56,12 @@ public interface Mailbox extends MailboxReceiver, MailboxSender { List<Runnable> close(); /** - * The effect of this is that all pending letters in the mailbox are dropped and the given priorityLetter - * is enqueued to the head of the mailbox. Dropped letters are returned. This method should only be invoked - * by code that has ownership of the mailbox object and only rarely used, e.g. to submit special events like - * shutting down the mailbox loop. - * - * @param priorityLetter action to enqueue atomically after the mailbox was cleared. - * @throws MailboxStateException if the mailbox is quiesced or closed. - */ - @Nonnull - List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException; - - /** - * Adds the given action to the head of the mailbox. This method will block if the mailbox is full and - * should therefore only be called from outside the mailbox main-thread to avoid deadlocks. - * - * @param priorityLetter action to enqueue to the head of the mailbox. - * @throws InterruptedException on interruption. - * @throws MailboxStateException if the mailbox is quiesced or closed. - */ - void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException; - - /** - * Adds the given action to the head of the mailbox if the mailbox is not full. Returns true if the letter - * was successfully added to the mailbox. + * Adds the given action to the head of the mailbox. * * @param priorityLetter action to enqueue to the head of the mailbox. - * @return true if the letter was successfully added. * @throws MailboxStateException if the mailbox is quiesced or closed. */ - boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException; + void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException; /** * Returns the current state of the mailbox as defined by the lifecycle enum {@link State}. @@ -94,11 +70,4 @@ public interface Mailbox extends MailboxReceiver, MailboxSender { */ @Nonnull State getState(); - - /** - * Returns the total capacity of the mailbox. - * - * @return the total capacity of the mailbox. - */ - int capacity(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java index 0f6bbf0..4f58c4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java @@ -21,10 +21,13 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.concurrent.locks.Condition; @@ -38,42 +41,24 @@ import java.util.concurrent.locks.ReentrantLock; public class MailboxImpl implements Mailbox { /** - * The enqueued letters. - */ - @GuardedBy("lock") - private final Runnable[] ringBuffer; - - /** * Lock for all concurrent ops. */ private final ReentrantLock lock; /** - * Condition that is triggered when the buffer is no longer empty. - */ - @GuardedBy("lock") - private final Condition notEmpty; - - /** - * Condition that is triggered when the buffer is no longer full. + * Internal queue of letters. */ @GuardedBy("lock") - private final Condition notFull; + private final LinkedList<Runnable> queue; /** - * Index of the ring buffer head. + * Condition that is triggered when the mailbox is no longer empty. */ @GuardedBy("lock") - private int headIndex; - - /** - * Index of the ring buffer tail. - */ - @GuardedBy("lock") - private int tailIndex; + private final Condition notEmpty; /** - * Number of letters in the mailbox. + * Number of letters in the mailbox. We track it separately from the queue#size to avoid locking on {@link #hasMail()}. */ @GuardedBy("lock") private volatile int count; @@ -84,24 +69,12 @@ public class MailboxImpl implements Mailbox { @GuardedBy("lock") private volatile State state; - /** - * A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops. - */ - private final int moduloMask; - public MailboxImpl() { - this(6); // 2^6 = 64 - } - - public MailboxImpl(int capacityPow2) { - final int capacity = 1 << capacityPow2; - Preconditions.checkState(capacity > 0); - this.moduloMask = capacity - 1; - this.ringBuffer = new Runnable[capacity]; this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); - this.notFull = lock.newCondition(); this.state = State.CLOSED; + this.queue = new LinkedList<>(); + this.count = 0; } @Override @@ -114,12 +87,7 @@ public class MailboxImpl implements Mailbox { final ReentrantLock lock = this.lock; lock.lock(); try { - if (isEmpty()) { - checkTakeStateConditions(); - return Optional.empty(); - } else { - return Optional.of(takeInternal()); - } + return Optional.ofNullable(takeHeadInternal()); } finally { lock.unlock(); } @@ -131,11 +99,11 @@ public class MailboxImpl implements Mailbox { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - while (isEmpty()) { - checkTakeStateConditions(); + Runnable headLetter; + while ((headLetter = takeHeadInternal()) == null) { notEmpty.await(); } - return takeInternal(); + return headLetter; } finally { lock.unlock(); } @@ -144,31 +112,10 @@ public class MailboxImpl implements Mailbox { //------------------------------------------------------------------------------------------------------------------ @Override - public boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException { + public void putMail(@Nonnull Runnable letter) throws MailboxStateException { final ReentrantLock lock = this.lock; lock.lock(); try { - if (isFull()) { - checkPutStateConditions(); - return false; - } else { - putTailInternal(letter); - return true; - } - } finally { - lock.unlock(); - } - } - - @Override - public void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (isFull()) { - checkPutStateConditions(); - notFull.await(); - } putTailInternal(letter); } finally { lock.unlock(); @@ -177,51 +124,12 @@ public class MailboxImpl implements Mailbox { //------------------------------------------------------------------------------------------------------------------ - @Nonnull - @Override - public List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException { - ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity()); - - lock.lock(); - try { - // check state first to avoid loosing any letters forever through exception - checkPutStateConditions(); - dropAllLetters(droppedLetters); - putTailInternal(priorityLetter); - } finally { - lock.unlock(); - } - - return droppedLetters; - } - - @Override - public void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (isFull()) { - checkPutStateConditions(); - notFull.await(); - } - putHeadInternal(priorityLetter); - } finally { - lock.unlock(); - } - } - @Override - public boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException { + public void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException { final ReentrantLock lock = this.lock; lock.lock(); try { - if (isFull()) { - checkPutStateConditions(); - return false; - } else { - putHeadInternal(priorityLetter); - return true; - } + putHeadInternal(priorityLetter); } finally { lock.unlock(); } @@ -229,59 +137,42 @@ public class MailboxImpl implements Mailbox { //------------------------------------------------------------------------------------------------------------------ - private void putHeadInternal(Runnable letter) throws MailboxStateException { + private void putHeadInternal(Runnable newHead) throws MailboxStateException { assert lock.isHeldByCurrentThread(); checkPutStateConditions(); - headIndex = decreaseIndexWithWrapAround(headIndex); - this.ringBuffer[headIndex] = letter; - ++count; + queue.addFirst(newHead); + incrementCountAndCheckOverflow(); notEmpty.signal(); } - private void putTailInternal(Runnable letter) throws MailboxStateException { + private void putTailInternal(Runnable newTail) throws MailboxStateException { assert lock.isHeldByCurrentThread(); checkPutStateConditions(); - this.ringBuffer[tailIndex] = letter; - tailIndex = increaseIndexWithWrapAround(tailIndex); - ++count; + queue.addLast(newTail); + incrementCountAndCheckOverflow(); notEmpty.signal(); } - private Runnable takeInternal() throws MailboxStateException { - assert lock.isHeldByCurrentThread(); - checkTakeStateConditions(); - final Runnable[] buffer = this.ringBuffer; - Runnable letter = buffer[headIndex]; - buffer[headIndex] = null; - headIndex = increaseIndexWithWrapAround(headIndex); - --count; - notFull.signal(); - return letter; + private void incrementCountAndCheckOverflow() { + Preconditions.checkState(++count > 0, "Mailbox overflow."); } - private void dropAllLetters(List<Runnable> dropInto) { + @Nullable + private Runnable takeHeadInternal() throws MailboxStateException { assert lock.isHeldByCurrentThread(); - int localCount = count; - while (localCount > 0) { - dropInto.add(ringBuffer[headIndex]); - ringBuffer[headIndex] = null; - headIndex = increaseIndexWithWrapAround(headIndex); - --localCount; - notFull.signal(); + checkTakeStateConditions(); + Runnable oldHead = queue.pollFirst(); + if (oldHead != null) { + --count; } - count = 0; - } - - private int increaseIndexWithWrapAround(int old) { - return (old + 1) & moduloMask; - } - - private int decreaseIndexWithWrapAround(int old) { - return (old - 1) & moduloMask; + return oldHead; } - private boolean isFull() { - return count >= capacity(); + private void drainAllLetters(List<Runnable> drainInto) { + assert lock.isHeldByCurrentThread(); + drainInto.addAll(queue); + queue.clear(); + count = 0; } private boolean isEmpty() { @@ -331,7 +222,6 @@ public class MailboxImpl implements Mailbox { if (state == State.OPEN) { state = State.QUIESCED; } - notFull.signalAll(); } finally { lock.unlock(); } @@ -340,20 +230,20 @@ public class MailboxImpl implements Mailbox { @Nonnull @Override public List<Runnable> close() { - final ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity()); - lock.lock(); try { - dropAllLetters(droppedLetters); + if (state == State.CLOSED) { + return Collections.emptyList(); + } + ArrayList<Runnable> droppedLetters = new ArrayList<>(count); + drainAllLetters(droppedLetters); state = State.CLOSED; // to unblock all - notFull.signalAll(); notEmpty.signalAll(); + return droppedLetters; } finally { lock.unlock(); } - - return droppedLetters; } @Nonnull @@ -361,9 +251,4 @@ public class MailboxImpl implements Mailbox { public State getState() { return state; } - - @Override - public int capacity() { - return ringBuffer.length; - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java index 2a2274a..0560726 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java @@ -27,21 +27,10 @@ import javax.annotation.Nonnull; public interface MailboxSender { /** - * Enqueues the given letter to the mailbox, if capacity is available. On success, this returns <code>true</code> - * and <code>false</code> if the mailbox was already full. - * - * @param letter the letter to enqueue. - * @return <code>true</code> iff successful. - * @throws MailboxStateException if the mailbox is quiesced or closed. - */ - boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException; - - /** * Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put. * * @param letter the letter to enqueue. - * @throws InterruptedException on interruption. * @throws MailboxStateException if the mailbox is quiesced or closed. */ - void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException; + void putMail(@Nonnull Runnable letter) throws MailboxStateException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java index 1848e77..eeca9c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java @@ -31,10 +31,7 @@ import java.util.concurrent.RejectedExecutionException; public interface MailboxExecutor extends Executor { /** - * Executes the given command at some time in the future in the mailbox thread. This call can block when the - * mailbox is currently full. Therefore, this method must not be called from the mailbox thread itself as this - * can cause a deadlock. Instead, if the caller is already in the mailbox thread, the command should just be - * executed directly or use the non-blocking {@link #tryExecute(Runnable)}. + * Executes the given command at some time in the future in the mailbox thread. * * @param command the runnable task to add to the mailbox for execution. * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is @@ -44,18 +41,6 @@ public interface MailboxExecutor extends Executor { void execute(@Nonnull Runnable command) throws RejectedExecutionException; /** - * Attempts to enqueue the given command in the mailbox for execution. On success, the method returns true. If - * the mailbox is full, this method returns immediately without adding the command and returns false. - * - * @param command the runnable task to add to the mailbox for execution. - * @return true if the command was added to the mailbox. False if the command could not be added because the mailbox - * was full. - * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is - * quiesced or closed. - */ - boolean tryExecute(Runnable command) throws RejectedExecutionException; - - /** * This methods starts running the command at the head of the mailbox and is intended to be used by the mailbox * thread to yield from a currently ongoing action to another command. The method blocks until another command to * run is available in the mailbox and must only be called from the mailbox thread. Must only be called from the diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java index 944b7a9..5c3cd95 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java @@ -53,27 +53,14 @@ public class MailboxExecutorServiceImpl extends AbstractExecutorService implemen @Override public void execute(@Nonnull Runnable command) { - checkIsNotMailboxThread(); try { mailbox.putMail(command); - } catch (InterruptedException irex) { - Thread.currentThread().interrupt(); - throw new RejectedExecutionException("Sender thread was interrupted while blocking on mailbox.", irex); } catch (MailboxStateException mbex) { throw new RejectedExecutionException(mbex); } } @Override - public boolean tryExecute(Runnable command) { - try { - return mailbox.tryPutMail(command); - } catch (MailboxStateException e) { - throw new RejectedExecutionException(e); - } - } - - @Override public void yield() throws InterruptedException, IllegalStateException { checkIsMailboxThread(); try { @@ -147,11 +134,4 @@ public class MailboxExecutorServiceImpl extends AbstractExecutorService implemen "Illegal thread detected. This method must be called from inside the mailbox thread!"); } } - - private void checkIsNotMailboxThread() { - if (isMailboxThread()) { - throw new IllegalStateException( - "Illegal thread detected. This method must NOT be called from inside the mailbox thread!"); - } - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java index 77906f7..6fc8da0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java @@ -114,7 +114,11 @@ public class MailboxProcessor { * {@link java.util.concurrent.RunnableFuture} that are still contained in the mailbox. */ public void close() { - FutureUtils.cancelRunnableFutures(mailboxExecutor.shutdownNow()); + List<Runnable> droppedLetters = mailboxExecutor.shutdownNow(); + if (!droppedLetters.isEmpty()) { + LOG.debug("Closing the mailbox dropped letters {}.", droppedLetters); + FutureUtils.cancelRunnableFutures(droppedLetters); + } } /** @@ -138,19 +142,11 @@ public class MailboxProcessor { } /** - * Cancels the mailbox loop execution. All pending mailbox actions will not be executed anymore, if they are - * instance of {@link java.util.concurrent.RunnableFuture}, they will be cancelled. - */ - public void cancelMailboxExecution() { - clearMailboxAndRunPriorityAction(mailboxPoisonLetter); - } - - /** * Reports a throwable for rethrowing from the mailbox thread. This will clear and cancel all other pending letters. * @param throwable to report by rethrowing from the mailbox loop. */ public void reportThrowable(Throwable throwable) { - clearMailboxAndRunPriorityAction(() -> { + sendPriorityLetter(() -> { throw new WrappingRuntimeException(throwable); }); } @@ -159,17 +155,14 @@ public class MailboxProcessor { * This method must be called to end the stream task when all actions for the tasks have been performed. */ public void allActionsCompleted() { + sendPriorityLetter(mailboxPoisonLetter); + } + + private void sendPriorityLetter(Runnable priorityLetter) { try { - if (mailboxExecutor.isMailboxThread()) { - mailboxLoopRunning = false; - ensureControlFlowSignalCheck(); - } else { - mailbox.putFirst(mailboxPoisonLetter); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + mailbox.putFirst(priorityLetter); } catch (MailboxStateException me) { - LOG.debug("Action context could not submit poison letter to mailbox.", me); + LOG.debug("Action context could not submit priority letter to mailbox.", me); } } @@ -236,20 +229,7 @@ public class MailboxProcessor { private void ensureControlFlowSignalCheck() { // Make sure that mailbox#hasMail is true via a dummy letter so that the flag change is noticed. if (!mailbox.hasMail()) { - try { - mailbox.tryPutMail(() -> {}); - } catch (MailboxStateException me) { - LOG.debug("Mailbox closed when trying to submit letter for control flow signal.", me); - } - } - } - - private void clearMailboxAndRunPriorityAction(Runnable priorityLetter) { - try { - List<Runnable> droppedRunnables = mailbox.clearAndPut(priorityLetter); - FutureUtils.cancelRunnableFutures(droppedRunnables); - } catch (MailboxStateException msex) { - LOG.debug("Mailbox already closed in cancel().", msex); + sendPriorityLetter(() -> {}); } } @@ -283,10 +263,14 @@ public class MailboxProcessor { @Override public void resume() { - Preconditions.checkState( - mailboxExecutor.isMailboxThread(), - "SuspendedMailboxDefaultAction::resume resume must only be called from the mailbox-thread!"); + if (mailboxExecutor.isMailboxThread()) { + resumeInternal(); + } else { + sendPriorityLetter(this::resumeInternal); + } + } + private void resumeInternal() { if (suspendedDefaultAction == this) { suspendedDefaultAction = null; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java index ca31254..6b3f32c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java @@ -24,7 +24,7 @@ package org.apache.flink.streaming.runtime.tasks.mailbox.execution; public interface SuspendedMailboxDefaultAction { /** - * Resume execution of the default action. Must only be called from the mailbox thread!. + * Resume execution of the default action. */ void resume(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java index 1a48136..b7b478e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.RunnableWithException; @@ -30,7 +29,6 @@ import org.junit.Before; import org.junit.Test; import java.util.LinkedList; -import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -42,9 +40,6 @@ import java.util.function.Consumer; public class MailboxImplTest { private static final Runnable POISON_LETTER = () -> {}; - private static final int CAPACITY_POW_2 = 2; - private static final int CAPACITY = 1 << CAPACITY_POW_2; - /** * Object under test. */ @@ -52,7 +47,7 @@ public class MailboxImplTest { @Before public void setUp() { - mailbox = new MailboxImpl(CAPACITY_POW_2); + mailbox = new MailboxImpl(); mailbox.open(); } @@ -61,26 +56,6 @@ public class MailboxImplTest { mailbox.close(); } - /** - * Test for #clearAndPut should remove other pending events and enqueue directly to the head of the mailbox queue. - */ - @Test - public void testClearAndPut() throws Exception { - - Runnable letterInstance = () -> {}; - - for (int i = 0; i < CAPACITY; ++i) { - Assert.assertTrue(mailbox.tryPutMail(letterInstance)); - } - - List<Runnable> droppedLetters = mailbox.clearAndPut(POISON_LETTER); - - Assert.assertTrue(mailbox.hasMail()); - Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get()); - Assert.assertFalse(mailbox.hasMail()); - Assert.assertEquals(CAPACITY, droppedLetters.size()); - } - @Test public void testPutAsHead() throws Exception { @@ -88,38 +63,16 @@ public class MailboxImplTest { Runnable instanceB = () -> {}; Runnable instanceC = () -> {}; Runnable instanceD = () -> {}; - Runnable instanceE = () -> {}; + mailbox.putMail(instanceC); + mailbox.putFirst(instanceB); mailbox.putMail(instanceD); - mailbox.tryPutFirst(instanceC); - mailbox.putMail(instanceE); mailbox.putFirst(instanceA); - OneShotLatch latch = new OneShotLatch(); - Thread blockingPut = new Thread(() -> { - // ensure we are full - try { - if (!mailbox.tryPutFirst(() -> { })) { - latch.trigger(); - - mailbox.putFirst(instanceB); - - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (MailboxStateException ignore) { - } - }); - - blockingPut.start(); - latch.await(); - Assert.assertSame(instanceA, mailbox.takeMail()); - blockingPut.join(); Assert.assertSame(instanceB, mailbox.takeMail()); Assert.assertSame(instanceC, mailbox.takeMail()); Assert.assertSame(instanceD, mailbox.takeMail()); - Assert.assertSame(instanceE, mailbox.takeMail()); Assert.assertFalse(mailbox.tryTakeMail().isPresent()); } @@ -129,15 +82,13 @@ public class MailboxImplTest { final Queue<Runnable> testObjects = new LinkedList<>(); Assert.assertFalse(mailbox.hasMail()); - for (int i = 0; i < CAPACITY; ++i) { + for (int i = 0; i < 10; ++i) { Runnable letter = () -> {}; testObjects.add(letter); - Assert.assertTrue(mailbox.tryPutMail(letter)); + mailbox.putMail(letter); Assert.assertTrue(mailbox.hasMail()); } - Assert.assertFalse(mailbox.tryPutMail(() -> {})); - while (!testObjects.isEmpty()) { Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get()); Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail()); @@ -164,9 +115,7 @@ public class MailboxImplTest { } return optionalLetter.get(); }), - ((mailbox, runnable) -> { - while (!mailbox.tryPutMail(runnable)) {} - })); + MailboxSender::putMail); } /** @@ -176,7 +125,7 @@ public class MailboxImplTest { public void testCloseUnblocks() throws InterruptedException { testAllPuttingUnblocksInternal(Mailbox::close); setUp(); - testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close, MailboxStateException.class); + testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close); } /** @@ -218,16 +167,6 @@ public class MailboxImplTest { private void testLifecyclePuttingInternal() throws Exception { try { - mailbox.tryPutMail(() -> {}); - Assert.fail(); - } catch (MailboxStateException ignore) { - } - try { - mailbox.tryPutFirst(() -> {}); - Assert.fail(); - } catch (MailboxStateException ignore) { - } - try { mailbox.putMail(() -> {}); Assert.fail(); } catch (MailboxStateException ignore) { @@ -240,18 +179,15 @@ public class MailboxImplTest { } private void testAllPuttingUnblocksInternal(Consumer<Mailbox> unblockMethod) throws InterruptedException { - testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod, MailboxStateException.class); - setUp(); - testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod, MailboxStateException.class); + testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod); setUp(); - testUnblocksInternal(() -> mailbox.clearAndPut(() -> {}), unblockMethod, MailboxStateException.class); + testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod); } private void testUnblocksInternal( RunnableWithException testMethod, - Consumer<Mailbox> unblockMethod, - Class<?> expectedExceptionClass) throws InterruptedException { - final Thread[] blockedThreads = new Thread[CAPACITY * 2]; + Consumer<Mailbox> unblockMethod) throws InterruptedException { + final Thread[] blockedThreads = new Thread[8]; final Exception[] exceptions = new Exception[blockedThreads.length]; CountDownLatch countDownLatch = new CountDownLatch(blockedThreads.length); @@ -280,7 +216,7 @@ public class MailboxImplTest { } for (Exception exception : exceptions) { - Assert.assertEquals(expectedExceptionClass, exception.getClass()); + Assert.assertEquals(MailboxStateException.class, exception.getClass()); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java index 92e106d..7c25e5f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java @@ -75,13 +75,13 @@ public class MailboxExecutorServiceImplTest { Assert.assertFalse(mailboxExecutorService.isShutdown()); Assert.assertFalse(mailboxExecutorService.isTerminated()); final TestRunnable testRunnable = new TestRunnable(); - Assert.assertTrue(mailboxExecutorService.tryExecute(testRunnable)); + mailboxExecutorService.execute(testRunnable); Assert.assertEquals(testRunnable, mailbox.tryTakeMail().get()); CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get(); Assert.assertEquals(testRunnable, mailbox.takeMail()); final TestRunnable yieldRun = new TestRunnable(); final TestRunnable leftoverRun = new TestRunnable(); - Assert.assertTrue(mailboxExecutorService.tryExecute(yieldRun)); + mailboxExecutorService.execute(yieldRun); Future<?> leftoverFuture = CompletableFuture.supplyAsync( () -> mailboxExecutorService.submit(leftoverRun), otherThreadExecutor).get(); mailboxExecutorService.shutdown(); @@ -96,7 +96,7 @@ public class MailboxExecutorServiceImplTest { } try { - CompletableFuture.runAsync(() -> mailboxExecutorService.tryExecute(testRunnable), otherThreadExecutor).get(); + CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get(); Assert.fail("execution should not work after shutdown()."); } catch (ExecutionException expected) { Assert.assertTrue(expected.getCause() instanceof RejectedExecutionException); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java index 80c71dd..46d5231 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java @@ -38,7 +38,7 @@ public class MailboxProcessorTest { public void testRejectIfNotOpen() { MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {}); try { - mailboxProcessor.getMailboxExecutor().tryExecute(() -> {}); + mailboxProcessor.getMailboxExecutor().execute(() -> {}); Assert.fail("Should not be able to accept runnables if not opened."); } catch (RejectedExecutionException expected) { } @@ -49,11 +49,11 @@ public class MailboxProcessorTest { MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {}); FutureTask<Void> testRunnableFuture = new FutureTask<>(() -> {}, null); mailboxProcessor.open(); - mailboxProcessor.getMailboxExecutor().tryExecute(testRunnableFuture); + mailboxProcessor.getMailboxExecutor().execute(testRunnableFuture); mailboxProcessor.prepareClose(); try { - mailboxProcessor.getMailboxExecutor().tryExecute(() -> {}); + mailboxProcessor.getMailboxExecutor().execute(() -> {}); Assert.fail("Should not be able to accept runnables if not opened."); } catch (RejectedExecutionException expected) { } @@ -123,11 +123,12 @@ public class MailboxProcessorTest { } }; - start(mailboxThread); + MailboxProcessor mailboxProcessor = start(mailboxThread); actionSuspendedLatch.await(); Assert.assertEquals(blockAfterInvocations, counter.get()); - suspendedActionRef.get().resume(); + SuspendedMailboxDefaultAction suspendedMailboxDefaultAction = suspendedActionRef.get(); + mailboxProcessor.getMailboxExecutor().execute(suspendedMailboxDefaultAction::resume); stop(mailboxThread); Assert.assertEquals(totalInvocations, counter.get()); } @@ -170,7 +171,7 @@ public class MailboxProcessorTest { final SuspendedMailboxDefaultAction resume = suspendedActionRef.getAndSet(null); if (resume != null) { - resume.resume(); + mailboxProcessor.getMailboxExecutor().execute(resume::resume); } else { try { mailboxProcessor.getMailboxExecutor().execute(() -> { }); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java index 31126a8..979fe58 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java @@ -46,12 +46,6 @@ public class TestMailboxExecutor implements MailboxExecutor { } @Override - public boolean tryExecute(Runnable command) { - execute(command); - return true; - } - - @Override public void yield() throws InterruptedException { synchronized (lock) { lock.wait(1);