This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 022f6cceef65859bc6f172151d09140038297f69 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Fri May 10 11:20:02 2019 +0200 [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop. This closes #8409. This closes #8431. This also decomposes monolithic run-loops in StreamTask implementations into step-wise calls. --- .../runtime/tasks/OneInputStreamTask.java | 12 +- .../streaming/runtime/tasks/SourceStreamTask.java | 5 +- .../runtime/tasks/StreamIterationHead.java | 105 ++++----- .../flink/streaming/runtime/tasks/StreamTask.java | 82 ++++++- .../runtime/tasks/TwoInputStreamTask.java | 13 +- .../streaming/runtime/tasks/mailbox/Mailbox.java | 36 ++++ .../runtime/tasks/mailbox/MailboxImpl.java | 236 +++++++++++++++++++++ .../runtime/tasks/mailbox/MailboxReceiver.java | 59 ++++++ .../runtime/tasks/mailbox/MailboxSender.java | 52 +++++ ...heckpointExceptionHandlerConfigurationTest.java | 4 +- .../tasks/StreamTaskCancellationBarrierTest.java | 4 +- .../runtime/tasks/StreamTaskTerminationTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java | 21 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 3 +- .../runtime/tasks/SynchronousCheckpointTest.java | 3 +- .../tasks/TaskCheckpointingBehaviourTest.java | 4 +- .../runtime/tasks/mailbox/MailboxImplTest.java | 170 +++++++++++++++ .../flink/streaming/util/MockStreamTask.java | 4 +- .../jobmaster/JobMasterStopWithSavepointIT.java | 10 +- 19 files changed, 726 insertions(+), 100 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 7498518..7b82d8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -39,8 +39,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO private StreamInputProcessor<IN> inputProcessor; - private volatile boolean running = true; - private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge(); /** @@ -98,12 +96,9 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO } @Override - protected void run() throws Exception { - // cache processor reference on the stack, to make the code more JIT friendly - final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; - - while (running && inputProcessor.processInput()) { - // all the work happens in the "processInput" method + protected void performDefaultAction(ActionContext context) throws Exception { + if (!inputProcessor.processInput()) { + context.allActionsCompleted(); } } @@ -116,6 +111,5 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO @Override protected void cancelTask() { - running = false; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 1a1c529..fd50a1a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -98,8 +98,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S } @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { + // Against the usual contract of this method, this implementation is not step-wise but blocking instead for + // compatibility reasons with the current source interface (source functions run as a loop, not in steps). headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index ecef7f0..d25bd23 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,88 +42,72 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); - private volatile boolean running = true; + private RecordWriterOutput<OUT>[] streamOutputs; + + private final BlockingQueue<StreamRecord<OUT>> dataChannel; + private final String brokerID; + private final long iterationWaitTime; + private final boolean shouldWait; public StreamIterationHead(Environment env) { super(env); + final String iterationId = getConfiguration().getIterationId(); + if (iterationId == null || iterationId.length() == 0) { + throw new FlinkRuntimeException("Missing iteration ID in the task configuration"); + } + + this.dataChannel = new ArrayBlockingQueue<>(1); + this.brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); + this.iterationWaitTime = getConfiguration().getIterationWaitTime(); + this.shouldWait = iterationWaitTime > 0; } // ------------------------------------------------------------------------ @Override - protected void run() throws Exception { - - final String iterationId = getConfiguration().getIterationId(); - if (iterationId == null || iterationId.length() == 0) { - throw new Exception("Missing iteration ID in the task configuration"); + protected void performDefaultAction(ActionContext context) throws Exception { + StreamRecord<OUT> nextRecord = shouldWait ? + dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : + dataChannel.take(); + + if (nextRecord != null) { + synchronized (getCheckpointLock()) { + for (RecordWriterOutput<OUT> output : streamOutputs) { + output.collect(nextRecord); + } + } + } else { + context.allActionsCompleted(); } + } - final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , - getEnvironment().getTaskInfo().getIndexOfThisSubtask()); - - final long iterationWaitTime = getConfiguration().getIterationWaitTime(); - final boolean shouldWait = iterationWaitTime > 0; - - final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1); + // ------------------------------------------------------------------------ + @SuppressWarnings("unchecked") + @Override + public void init() { // offer the queue for the tail BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel); LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID); - // do the work - try { - @SuppressWarnings("unchecked") - RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs(); - - // If timestamps are enabled we make sure to remove cyclic watermark dependencies - if (isSerializingTimestamps()) { - synchronized (getCheckpointLock()) { - for (RecordWriterOutput<OUT> output : outputs) { - output.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - } - } + this.streamOutputs = (RecordWriterOutput<OUT>[]) getStreamOutputs(); - while (running) { - StreamRecord<OUT> nextRecord = shouldWait ? - dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : - dataChannel.take(); - - if (nextRecord != null) { - synchronized (getCheckpointLock()) { - for (RecordWriterOutput<OUT> output : outputs) { - output.collect(nextRecord); - } - } - } - else { - // done - break; + // If timestamps are enabled we make sure to remove cyclic watermark dependencies + if (isSerializingTimestamps()) { + synchronized (getCheckpointLock()) { + for (RecordWriterOutput<OUT> output : streamOutputs) { + output.emitWatermark(new Watermark(Long.MAX_VALUE)); } } } - finally { - // make sure that we remove the queue from the broker, to prevent a resource leak - BlockingQueueBroker.INSTANCE.remove(brokerID); - LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID); - } - } - - @Override - protected void cancelTask() { - running = false; - } - - // ------------------------------------------------------------------------ - - @Override - public void init() { - // does not hold any resources, no initialization necessary } @Override - protected void cleanup() throws Exception { - // does not hold any resources, no cleanup necessary + protected void cleanup() { + // make sure that we remove the queue from the broker, to prevent a resource leak + BlockingQueueBroker.INSTANCE.remove(brokerID); + LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8a3d006..2df565d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -55,8 +55,11 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +71,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -124,6 +128,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> /** The logger used by the StreamTask and its subclasses. */ private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); + /** Special value, letter that terminates the mailbox loop. */ + private static final Runnable POISON_LETTER = () -> {}; + + /** Special value, letter that "wakes up" a waiting mailbox loop. */ + private static final Runnable DEFAULT_ACTION_AVAILABLE = () -> {}; + // ------------------------------------------------------------------------ /** @@ -182,6 +192,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final SynchronousSavepointLatch syncSavepointLatch; + protected final Mailbox mailbox; + // ------------------------------------------------------------------------ /** @@ -214,6 +226,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); this.recordWriters = createRecordWriters(configuration, environment); this.syncSavepointLatch = new SynchronousSavepointLatch(); + this.mailbox = new MailboxImpl(); } // ------------------------------------------------------------------------ @@ -222,13 +235,41 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> protected abstract void init() throws Exception; - protected abstract void run() throws Exception; - protected abstract void cleanup() throws Exception; protected abstract void cancelTask() throws Exception; /** + * This method implements the default action of the task (e.g. processing one event from the input). Implementations + * should (in general) be non-blocking. + * + * @param context context object for collaborative interaction between the action and the stream task. + * @throws Exception on any problems in the action. + */ + protected abstract void performDefaultAction(ActionContext context) throws Exception; + + /** + * Runs the stream-tasks main processing loop. + */ + private void run() throws Exception { + final ActionContext actionContext = new ActionContext(); + while (true) { + if (mailbox.hasMail()) { + Optional<Runnable> maybeLetter; + while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) { + Runnable letter = maybeLetter.get(); + if (letter == POISON_LETTER) { + return; + } + letter.run(); + } + } + + performDefaultAction(actionContext); + } + } + + /** * Emits the {@link org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK MAX_WATERMARK} * so that all registered timers are fired. * @@ -426,6 +467,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public final void cancel() throws Exception { + mailbox.clearAndPut(POISON_LETTER); isRunning = false; canceled = true; @@ -1280,4 +1322,40 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup()); return output; } + + /** + * The action context is passed as parameter into the default action method and holds control methods for feedback + * of from the default action to the mailbox. + */ + public final class ActionContext { + + private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(mailbox::waitUntilHasMail); + + /** + * This method must be called to end the stream task when all actions for the tasks have been performed. + */ + public void allActionsCompleted() { + mailbox.clearAndPut(POISON_LETTER); + } + + /** + * Calling this method signals that the mailbox-thread should continue invoking the default action, e.g. because + * new input became available for processing. + * + * @throws InterruptedException on interruption. + */ + public void actionsAvailable() throws InterruptedException { + mailbox.putMail(DEFAULT_ACTION_AVAILABLE); + } + + /** + * Calling this method signals that the mailbox-thread should (temporarily) stop invoking the default action, + * e.g. because there is currently no input available. + * + * @throws InterruptedException on interruption. + */ + public void actionsUnavailable() throws InterruptedException { + mailbox.putMail(actionUnavailableLetter); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 546ccdb..934f2cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -40,8 +40,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS private StreamTwoInputProcessor<IN1, IN2> inputProcessor; - private volatile boolean running = true; - private final WatermarkGauge input1WatermarkGauge; private final WatermarkGauge input2WatermarkGauge; private final MinWatermarkGauge minInputWatermarkGauge; @@ -110,12 +108,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS } @Override - protected void run() throws Exception { - // cache processor reference on the stack, to make the code more JIT friendly - final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor; - - while (running && inputProcessor.processInput()) { - // all the work happens in the "processInput" method + protected void performDefaultAction(ActionContext context) throws Exception { + if (!inputProcessor.processInput()) { + context.allActionsCompleted(); } } @@ -128,6 +123,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS @Override protected void cancelTask() { - running = false; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java new file mode 100644 index 0000000..dfa8d76 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +/** + * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between + * multiple producer threads and a single consumer. + */ +public interface Mailbox extends MailboxReceiver, MailboxSender { + + /** + * The effect of this is that all pending letters are dropped and the given priorityAction + * is enqueued to the head of the mailbox. + * + * @param priorityAction action to enqueue atomically after the mailbox was cleared. + */ + void clearAndPut(@Nonnull Runnable priorityAction); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java new file mode 100644 index 0000000..411efd1 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Optional; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implementation of {@link Mailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards + * our use case with multiple writers, single reader and volatile reads instead of lock & read on {@link #count}. + */ +@ThreadSafe +public class MailboxImpl implements Mailbox { + + /** + * The enqueued letters. + */ + @GuardedBy("lock") + private final Runnable[] ringBuffer; + + /** + * Lock for all concurrent ops. + */ + private final ReentrantLock lock; + + /** + * Condition that is triggered when the buffer is no longer empty. + */ + @GuardedBy("lock") + private final Condition notEmpty; + + /** + * Condition that is triggered when the buffer is no longer full. + */ + @GuardedBy("lock") + private final Condition notFull; + + /** + * Index of the ring buffer head. + */ + @GuardedBy("lock") + private int headIndex; + + /** + * Index of the ring buffer tail. + */ + @GuardedBy("lock") + private int tailIndex; + + /** + * Number of letters in the mailbox. + */ + @GuardedBy("lock") + private volatile int count; + + /** + * A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops. + */ + private final int moduloMask; + + public MailboxImpl() { + this(6); // 2^6 = 64 + } + + public MailboxImpl(int capacityPow2) { + final int capacity = 1 << capacityPow2; + Preconditions.checkState(capacity > 0); + this.moduloMask = capacity - 1; + this.ringBuffer = new Runnable[capacity]; + this.lock = new ReentrantLock(); + this.notEmpty = lock.newCondition(); + this.notFull = lock.newCondition(); + } + + @Override + public boolean hasMail() { + return !isEmpty(); + } + + @Override + public Optional<Runnable> tryTakeMail() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return isEmpty() ? Optional.empty() : Optional.of(takeInternal()); + } finally { + lock.unlock(); + } + } + + @Nonnull + @Override + public Runnable takeMail() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isEmpty()) { + notEmpty.await(); + } + return takeInternal(); + } finally { + lock.unlock(); + } + } + + @Override + public void waitUntilHasMail() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isEmpty()) { + notEmpty.await(); + } + } finally { + lock.unlock(); + } + } + + //------------------------------------------------------------------------------------------------------------------ + + @Override + public boolean tryPutMail(@Nonnull Runnable letter) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (isFull()) { + return false; + } else { + putInternal(letter); + return true; + } + } finally { + lock.unlock(); + } + } + + @Override + public void putMail(@Nonnull Runnable letter) throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isFull()) { + notFull.await(); + } + putInternal(letter); + } finally { + lock.unlock(); + } + } + + @Override + public void waitUntilHasCapacity() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isFull()) { + notFull.await(); + } + } finally { + lock.unlock(); + } + } + + //------------------------------------------------------------------------------------------------------------------ + + private void putInternal(Runnable letter) { + assert lock.isHeldByCurrentThread(); + this.ringBuffer[tailIndex] = letter; + tailIndex = increaseIndexWithWrapAround(tailIndex); + ++count; + notEmpty.signal(); + } + + private Runnable takeInternal() { + assert lock.isHeldByCurrentThread(); + final Runnable[] buffer = this.ringBuffer; + Runnable letter = buffer[headIndex]; + buffer[headIndex] = null; + headIndex = increaseIndexWithWrapAround(headIndex); + --count; + notFull.signal(); + return letter; + } + + private int increaseIndexWithWrapAround(int old) { + return (old + 1) & moduloMask; + } + + private boolean isFull() { + return count >= ringBuffer.length; + } + + private boolean isEmpty() { + return count == 0; + } + + @Override + public void clearAndPut(@Nonnull Runnable shutdownAction) { + lock.lock(); + try { + int localCount = count; + while (localCount > 0) { + ringBuffer[headIndex] = null; + headIndex = increaseIndexWithWrapAround(headIndex); + --localCount; + } + count = 0; + putInternal(shutdownAction); + } finally { + lock.unlock(); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java new file mode 100644 index 0000000..189687e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +import java.util.Optional; + +/** + * Consumer-facing side of the {@link Mailbox} interface. This is used to dequeue letters. The mailbox returns letters + * in the order by which they were enqueued. A mailbox should only be consumed by one thread at a time. + */ +public interface MailboxReceiver { + + /** + * Returns <code>true</code> if the mailbox contains mail. + */ + boolean hasMail(); + + /** + * Returns an optional with either the oldest letter from the mailbox (head of queue) if the mailbox is not empty or + * an empty optional otherwise. + * + * @return an optional with either the oldest letter from the mailbox (head of queue) if the mailbox is not empty or + * an empty optional otherwise. + */ + Optional<Runnable> tryTakeMail(); + + /** + * This method returns the oldest letter from the mailbox (head of queue) or blocks until a letter is available. + * + * @return the oldest letter from the mailbox (head of queue). + * @throws InterruptedException on interruption. + */ + @Nonnull + Runnable takeMail() throws InterruptedException; + + /** + * This method blocks if the mailbox is empty until mail becomes available. + * @throws InterruptedException on interruption. + */ + void waitUntilHasMail() throws InterruptedException; +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java new file mode 100644 index 0000000..1829125 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +/** + * Producer-facing side of the {@link Mailbox} interface. This is used to enqueue letters. Multiple producers threads + * can put to the same mailbox. + */ +public interface MailboxSender { + + /** + * Enqueues the given letter to the mailbox, if capacity is available. On success, this returns <code>true</code> + * and <code>false</code> if the mailbox was already full. + * + * @param letter the letter to enqueue. + * @return <code>true</code> iff successful. + */ + boolean tryPutMail(@Nonnull Runnable letter); + + /** + * Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put. + * + * @param letter the letter to enqueue. + * @throws InterruptedException on interruption. + */ + void putMail(@Nonnull Runnable letter) throws InterruptedException; + + /** + * This method blocks until the mailbox has again capacity to enqueue new letters. + * + * @throws InterruptedException on interruption. + */ + void waitUntilHasCapacity() throws InterruptedException; +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java index 08cee55..17ab88f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java @@ -80,7 +80,9 @@ public class CheckpointExceptionHandlerConfigurationTest extends TestLogger { protected void init() throws Exception {} @Override - protected void run() throws Exception {} + protected void performDefaultAction(ActionContext context) throws Exception { + context.allActionsCompleted(); + } @Override protected void cleanup() throws Exception {} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index 80a38e4..d1b3697 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -188,7 +188,9 @@ public class StreamTaskCancellationBarrierTest { } @Override - protected void run() throws Exception {} + protected void performDefaultAction(ActionContext context) throws Exception { + context.allActionsCompleted(); + } @Override protected void cleanup() throws Exception {} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 8918b0a..c079d15 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -225,10 +225,11 @@ public class StreamTaskTerminationTest extends TestLogger { } @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { RUN_LATCH.trigger(); // wait until we have started an asynchronous checkpoint CHECKPOINTING_LATCH.await(); + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index e24949d..af779b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -844,7 +844,9 @@ public class StreamTaskTest extends TestLogger { protected void init() throws Exception {} @Override - protected void run() throws Exception {} + protected void performDefaultAction(ActionContext context) throws Exception { + context.allActionsCompleted(); + } @Override protected void cleanup() throws Exception {} @@ -1031,7 +1033,9 @@ public class StreamTaskTest extends TestLogger { protected void init() throws Exception {} @Override - protected void run() throws Exception {} + protected void performDefaultAction(ActionContext context) throws Exception { + context.allActionsCompleted(); + } @Override protected void cleanup() throws Exception {} @@ -1059,10 +1063,11 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { if (fail) { throw new RuntimeException(); } + context.allActionsCompleted(); } @Override @@ -1149,7 +1154,7 @@ public class StreamTaskTest extends TestLogger { protected void init() {} @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { holder = new LockHolder(getCheckpointLock(), latch); holder.start(); latch.await(); @@ -1164,6 +1169,7 @@ public class StreamTaskTest extends TestLogger { // restore interruption state Thread.currentThread().interrupt(); } + context.allActionsCompleted(); } @Override @@ -1193,7 +1199,7 @@ public class StreamTaskTest extends TestLogger { protected void init() {} @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); @@ -1219,7 +1225,7 @@ public class StreamTaskTest extends TestLogger { finally { holder.close(); } - + context.allActionsCompleted(); } @Override @@ -1259,8 +1265,9 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { syncLatch.await(); + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index 222133a..778a7d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -156,9 +156,10 @@ public class SynchronousCheckpointITCase { } @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { executionLatch.trigger(); cancellationLatch.await(); + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java index 2ad8c6f..f7e36d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java @@ -174,9 +174,10 @@ public class SynchronousCheckpointTest { protected void init() {} @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { runningLatch.trigger(); execLatch.await(); + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index 244c8aa..9418e14 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -506,8 +506,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { public void init() {} @Override - protected void run() throws Exception { - + protected void performDefaultAction(ActionContext context) throws Exception { triggerCheckpointOnBarrier( new CheckpointMetaData( 11L, @@ -518,6 +517,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { while (isRunning()) { Thread.sleep(1L); } + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java new file mode 100644 index 0000000..fc7f19c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * Unit tests for {@link MailboxImpl}. + */ +public class MailboxImplTest { + + private static final Runnable POISON_LETTER = () -> {}; + private static final int CAPACITY_POW_2 = 1; + private static final int CAPACITY = 1 << CAPACITY_POW_2; + + /** + * Object under test. + */ + private Mailbox mailbox; + + @Before + public void setUp() throws Exception { + mailbox = new MailboxImpl(CAPACITY_POW_2); + } + + /** + * Test for #clearAndPut should remove other pending events and enqueue directly to the head of the mailbox queue. + */ + @Test + public void testClearAndPut() { + for (int i = 0; i < CAPACITY; ++i) { + Assert.assertTrue(mailbox.tryPutMail(() -> {})); + } + + mailbox.clearAndPut(POISON_LETTER); + + Assert.assertTrue(mailbox.hasMail()); + Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get()); + Assert.assertFalse(mailbox.hasMail()); + } + + @Test + public void testContracts() throws Exception { + final Queue<Runnable> testObjects = new LinkedList<>(); + Assert.assertFalse(mailbox.hasMail()); + + for (int i = 0; i < CAPACITY; ++i) { + Runnable letter = () -> {}; + testObjects.add(letter); + Assert.assertTrue(mailbox.tryPutMail(letter)); + Assert.assertTrue(mailbox.hasMail()); + } + + Assert.assertFalse(mailbox.tryPutMail(() -> {})); + + while (!testObjects.isEmpty()) { + Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get()); + Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail()); + mailbox.waitUntilHasCapacity(); // should not block here because the mailbox is not full + } + + Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasMail())); + waitingReader.start(); + Thread.sleep(1); + Assert.assertTrue(waitingReader.isAlive()); + mailbox.tryPutMail(() -> {}); + waitingReader.join(); // should complete here + + while (mailbox.tryPutMail(() -> {})) {} + + Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasCapacity())); + waitingWriter.start(); + Thread.sleep(1); + Assert.assertTrue(waitingWriter.isAlive()); + mailbox.takeMail(); + waitingWriter.join(); + } + + /** + * Test the producer-consumer pattern using the blocking methods on the mailbox. + */ + @Test + public void testConcurrentPutTakeBlocking() throws Exception { + testPutTake(MailboxReceiver::takeMail, MailboxSender::putMail); + } + + /** + * Test the producer-consumer pattern using the non-blocking methods & waits on the mailbox. + */ + @Test + public void testConcurrentPutTakeNonBlockingAndWait() throws Exception { + testPutTake((mailbox -> { + mailbox.waitUntilHasMail(); + return mailbox.tryTakeMail().get(); + }), + ((mailbox, runnable) -> { + while (!mailbox.tryPutMail(runnable)) { + mailbox.waitUntilHasCapacity(); + } + })); + } + + /** + * Test producer-consumer pattern through the mailbox in a concurrent setting (n-writer / 1-reader). + */ + private void testPutTake( + FunctionWithException<Mailbox, Runnable, Exception> takeMethod, + BiConsumerWithException<Mailbox, Runnable, Exception> putMethod) throws Exception { + final int numThreads = 10; + final int numLettersPerThread = 1000; + final int[] results = new int[numThreads]; + Thread[] writerThreads = new Thread[numThreads]; + Thread readerThread = new Thread(ThrowingRunnable.unchecked(() -> { + Runnable letter; + while ((letter = takeMethod.apply(mailbox)) != POISON_LETTER) { + letter.run(); + } + })); + + readerThread.start(); + for (int i = 0; i < writerThreads.length; ++i) { + final int threadId = i; + writerThreads[i] = new Thread(ThrowingRunnable.unchecked(() -> { + for (int k = 0; k < numLettersPerThread; ++k) { + putMethod.accept(mailbox, () -> ++results[threadId]); + } + })); + } + + for (Thread writerThread : writerThreads) { + writerThread.start(); + } + + for (Thread writerThread : writerThreads) { + writerThread.join(); + } + + mailbox.putMail(POISON_LETTER); + + readerThread.join(); + for (int perThreadResult : results) { + Assert.assertEquals(numLettersPerThread, perThreadResult); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java index 230c68a..835d924 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java @@ -81,7 +81,9 @@ public class MockStreamTask extends StreamTask { public void init() { } @Override - protected void run() { } + protected void performDefaultAction(ActionContext context) throws Exception { + context.allActionsCompleted(); + } @Override protected void cleanup() { } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index 986e410..57a5121 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -284,13 +284,14 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void run() throws InterruptedException { + protected void performDefaultAction(ActionContext context) throws Exception { final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); if (taskIndex == 0) { numberOfRestarts.countDown(); } invokeLatch.countDown(); finishLatch.await(); + context.allActionsCompleted(); } @Override @@ -340,9 +341,10 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void run() throws InterruptedException { + protected void performDefaultAction(ActionContext context) throws Exception { invokeLatch.countDown(); finishLatch.await(); + context.allActionsCompleted(); } @Override @@ -368,8 +370,8 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void run() throws Exception { - + protected void performDefaultAction(ActionContext context) throws Exception { + context.allActionsCompleted(); } @Override