This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1568ecc67def90237d151b4f43de8d4c16ee6116 Author: Aleksey Pak <alek...@ververica.com> AuthorDate: Tue Jun 4 14:38:13 2019 +0200 [FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool --- .../BlockingCallMonitoringThreadPool.java | 127 --------------------- .../org/apache/flink/runtime/taskmanager/Task.java | 36 +++--- .../BlockingCallMonitoringThreadPoolTest.java | 112 ------------------ .../runtime/taskmanager/TaskAsyncCallTest.java | 11 +- 4 files changed, 15 insertions(+), 271 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java deleted file mode 100644 index d0fb868..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.annotation.VisibleForTesting; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A Thread Pool used to monitor the number of in-flight calls that block and wait for another task executed - * by the same pool in order to get unblocked. When a call (blocking or non-blocking) is submitted, the size - * of the pool is set to {@code 1 + activeBlockingCalls}. This allows the thread pool size to follow the needs - * of the system and to avoid any redundant idle threads consuming resources. - */ -public class BlockingCallMonitoringThreadPool { - - private static final Logger LOG = LoggerFactory.getLogger(BlockingCallMonitoringThreadPool.class); - - private final AtomicInteger inFlightBlockingCallCounter = new AtomicInteger(0); - - private final ThreadPoolExecutor executor; - - public BlockingCallMonitoringThreadPool() { - this(Executors.defaultThreadFactory()); - } - - public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) { - this.executor = new ThreadPoolExecutor( - 1, - 1, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - checkNotNull(dispatcherThreadFactory)); - } - - public CompletableFuture<?> submit(final Runnable runnable, final boolean blocking) { - if (blocking) { - return submitBlocking(runnable); - } else { - return submit(runnable); - } - } - - private CompletableFuture<?> submit(final Runnable task) { - adjustThreadPoolSize(inFlightBlockingCallCounter.get()); - return CompletableFuture.runAsync(task, executor); - } - - private CompletableFuture<?> submitBlocking(final Runnable task) { - adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet()); - return CompletableFuture.runAsync(task, executor).whenComplete( - (ignored, e) -> inFlightBlockingCallCounter.decrementAndGet()); - } - - private void adjustThreadPoolSize(final int activeBlockingCalls) { - if (activeBlockingCalls > 1) { - LOG.debug("There are {} active threads with blocking calls", activeBlockingCalls); - } - - final int newPoolSize = 1 + activeBlockingCalls; - - // We have to reset the core pool size because (quoted from the official docs): - // `` - // If there are more than corePoolSize but less than maximumPoolSize threads running, - // ** a new thread will be created ONLY IF THE QUEUE IS FULL **. - // `` - - // ensure that regardless of whether we increase/reduce the pool size, maximum is always >= core - if (newPoolSize < executor.getCorePoolSize()) { - executor.setCorePoolSize(newPoolSize); - executor.setMaximumPoolSize(newPoolSize); - } else { - executor.setMaximumPoolSize(newPoolSize); - executor.setCorePoolSize(newPoolSize); - } - } - - public void shutdown() { - executor.shutdown(); - } - - public boolean isShutdown() { - return executor.isShutdown(); - } - - public void shutdownNow() { - executor.shutdownNow(); - } - - @VisibleForTesting - int getMaximumPoolSize() { - return executor.getMaximumPoolSize(); - } - - @VisibleForTesting - int getQueueSize() { - return executor.getQueue().size(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 4355821..d4e1d8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -93,6 +93,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -257,8 +259,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid /** The observed exception, in case the task execution failed. */ private volatile Throwable failureCause; - /** Executor for asynchronous calls (checkpoints, etc), lazily initialized. */ - private volatile BlockingCallMonitoringThreadPool asyncCallDispatcher; + /** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized. */ + private volatile ExecutorService asyncCallDispatcher; /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ private long taskCancellationInterval; @@ -789,7 +791,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid // stop the async dispatcher. // copy dispatcher reference to stack, against concurrent release - final BlockingCallMonitoringThreadPool dispatcher = this.asyncCallDispatcher; + ExecutorService dispatcher = this.asyncCallDispatcher; if (dispatcher != null && !dispatcher.isShutdown()) { dispatcher.shutdownNow(); } @@ -1153,8 +1155,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid }; executeAsyncCallRunnable( runnable, - String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId), - checkpointOptions.getCheckpointType().isSynchronous()); + String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } else { LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); @@ -1189,8 +1190,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid }; executeAsyncCallRunnable( runnable, - "Checkpoint Confirmation for " + taskNameWithSubtask, - false); + "Checkpoint Confirmation for " + taskNameWithSubtask + ); } else { LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask); @@ -1201,11 +1202,10 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid /** * Utility method to dispatch an asynchronous call on the invokable. - * - * @param runnable The async call runnable. + * @param runnable The async call runnable. * @param callName The name of the call, for logging purposes. */ - private void executeAsyncCallRunnable(Runnable runnable, String callName, boolean blocking) { + private void executeAsyncCallRunnable(Runnable runnable, String callName) { // make sure the executor is initialized. lock against concurrent calls to this function synchronized (this) { if (executionState != ExecutionState.RUNNING) { @@ -1213,20 +1213,12 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid } // get ourselves a reference on the stack that cannot be concurrently modified - BlockingCallMonitoringThreadPool executor = this.asyncCallDispatcher; + ExecutorService executor = this.asyncCallDispatcher; if (executor == null) { // first time use, initialize checkState(userCodeClassLoader != null, "userCodeClassLoader must not be null"); - // Under normal execution, we expect that one thread will suffice, this is why we - // keep the core threads to 1. In the case of a synchronous savepoint, we will block - // the checkpointing thread, so we need an additional thread to execute the - // notifyCheckpointComplete() callback. Finally, we aggressively purge (potentially) - // idle thread so that we do not risk to have many idle thread on machines with multiple - // tasks on them. Either way, only one of them can execute at a time due to the - // checkpoint lock. - - executor = new BlockingCallMonitoringThreadPool( + executor = Executors.newSingleThreadExecutor( new DispatcherThreadFactory( TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask, @@ -1245,13 +1237,13 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask); try { - executor.submit(runnable, blocking); + executor.submit(runnable); } catch (RejectedExecutionException e) { // may be that we are concurrently finished or canceled. // if not, report that something is fishy if (executionState == ExecutionState.RUNNING) { - throw new RuntimeException("Async call with a " + (blocking ? "" : "non-") + "blocking call was rejected, even though the task is running.", e); + throw new RuntimeException("Async call was rejected, even though the task is running.", e); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java deleted file mode 100644 index 2cc3454..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.core.testutils.OneShotLatch; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link BlockingCallMonitoringThreadPool}. - */ -public class BlockingCallMonitoringThreadPoolTest { - - private final static int TIME_OUT = 30; - - private final OneShotLatch latch1 = new OneShotLatch(); - private final OneShotLatch latch2 = new OneShotLatch(); - private BlockingCallMonitoringThreadPool blockingCallThreadPool = new BlockingCallMonitoringThreadPool(); - - @Before - public void setup() { - blockingCallThreadPool = new BlockingCallMonitoringThreadPool(); - latch1.reset(); - latch2.reset(); - } - - @After - public void tearDown() { - latch1.trigger(); - latch2.trigger(); - blockingCallThreadPool.shutdown(); - } - - @Test - public void testSubmitNonBlockingCalls() throws Exception { - blockingCallThreadPool.submit(() -> await(latch1), false); - blockingCallThreadPool.submit(() -> await(latch2), false); - - assertEquals(1, blockingCallThreadPool.getMaximumPoolSize()); - assertEquals(1, blockingCallThreadPool.getQueueSize()); - } - - @Test - public void testSubmitBlockingCall() throws Exception { - CompletableFuture<?> latch1Future = blockingCallThreadPool.submit(() -> await(latch1), true); - CompletableFuture<?> latch2Future = blockingCallThreadPool.submit(() -> await(latch2), false); - - assertEquals(2, blockingCallThreadPool.getMaximumPoolSize()); - assertEquals(0, blockingCallThreadPool.getQueueSize()); - - latch2.trigger(); - latch2Future.get(TIME_OUT, TimeUnit.SECONDS); - - assertFalse(latch1Future.isDone()); - assertTrue(latch2Future.isDone()); - } - - @Test - public void testDownsizePool() throws Exception { - List<CompletableFuture<?>> futures = new ArrayList<>(); - - futures.add(blockingCallThreadPool.submit(() -> await(latch1), true)); - futures.add(blockingCallThreadPool.submit(() -> await(latch1), true)); - futures.add(blockingCallThreadPool.submit(() -> await(latch1), false)); - - assertEquals(3, blockingCallThreadPool.getMaximumPoolSize()); - - latch1.trigger(); - - for (CompletableFuture<?> future : futures) { - future.get(TIME_OUT, TimeUnit.SECONDS); - } - - blockingCallThreadPool.submit(() -> await(latch1), false).get(TIME_OUT, TimeUnit.SECONDS); - assertEquals(1, blockingCallThreadPool.getMaximumPoolSize()); - } - - private void await(OneShotLatch latch) { - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 5e43f68..f7b366b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -62,7 +62,6 @@ import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -130,16 +129,12 @@ public class TaskAsyncCallTest extends TestLogger { } // ------------------------------------------------------------------------ - // Tests + // Tests // ------------------------------------------------------------------------ @Test - @Ignore public void testCheckpointCallsInOrder() throws Exception { - // test ignored because with the changes introduced by [FLINK-11667], - // there is not guarantee about the order in which checkpoints are executed. - Task task = createTask(CheckpointsInOrderInvokable.class); try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); @@ -160,12 +155,8 @@ public class TaskAsyncCallTest extends TestLogger { } @Test - @Ignore public void testMixedAsyncCallsInOrder() throws Exception { - // test ignored because with the changes introduced by [FLINK-11667], - // there is not guarantee about the order in which checkpoints are executed. - Task task = createTask(CheckpointsInOrderInvokable.class); try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread();