This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1568ecc67def90237d151b4f43de8d4c16ee6116
Author: Aleksey Pak <alek...@ververica.com>
AuthorDate: Tue Jun 4 14:38:13 2019 +0200

    [FLINK-13205][runtime] Make checkpoints injection ordered again (partial 
revert of FLINK-11458): use single threaded Task's dispatcher thread pool
---
 .../BlockingCallMonitoringThreadPool.java          | 127 ---------------------
 .../org/apache/flink/runtime/taskmanager/Task.java |  36 +++---
 .../BlockingCallMonitoringThreadPoolTest.java      | 112 ------------------
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  11 +-
 4 files changed, 15 insertions(+), 271 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
deleted file mode 100644
index d0fb868..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A Thread Pool used to monitor the number of in-flight calls that block and 
wait for another task executed
- * by the same pool in order to get unblocked. When a call (blocking or 
non-blocking) is submitted, the size
- * of the pool is set to {@code 1 + activeBlockingCalls}. This allows the 
thread pool size to follow the needs
- * of the system and to avoid any redundant idle threads consuming resources.
- */
-public class BlockingCallMonitoringThreadPool {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(BlockingCallMonitoringThreadPool.class);
-
-       private final AtomicInteger inFlightBlockingCallCounter = new 
AtomicInteger(0);
-
-       private final ThreadPoolExecutor executor;
-
-       public BlockingCallMonitoringThreadPool() {
-               this(Executors.defaultThreadFactory());
-       }
-
-       public BlockingCallMonitoringThreadPool(final ThreadFactory 
dispatcherThreadFactory) {
-               this.executor = new ThreadPoolExecutor(
-                               1,
-                               1,
-                               10L,
-                               TimeUnit.SECONDS,
-                               new LinkedBlockingQueue<>(),
-                               checkNotNull(dispatcherThreadFactory));
-       }
-
-       public CompletableFuture<?> submit(final Runnable runnable, final 
boolean blocking) {
-               if (blocking) {
-                       return submitBlocking(runnable);
-               } else {
-                       return submit(runnable);
-               }
-       }
-
-       private CompletableFuture<?> submit(final Runnable task) {
-               adjustThreadPoolSize(inFlightBlockingCallCounter.get());
-               return CompletableFuture.runAsync(task, executor);
-       }
-
-       private CompletableFuture<?> submitBlocking(final Runnable task) {
-               
adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
-               return CompletableFuture.runAsync(task, executor).whenComplete(
-                               (ignored, e) -> 
inFlightBlockingCallCounter.decrementAndGet());
-       }
-
-       private void adjustThreadPoolSize(final int activeBlockingCalls) {
-               if (activeBlockingCalls > 1) {
-                       LOG.debug("There are {} active threads with blocking 
calls", activeBlockingCalls);
-               }
-
-               final int newPoolSize = 1 + activeBlockingCalls;
-
-               // We have to reset the core pool size because (quoted from the 
official docs):
-               // ``
-               // If there are more than corePoolSize but less than 
maximumPoolSize threads running,
-               // ** a new thread will be created ONLY IF THE QUEUE IS FULL **.
-               // ``
-
-               // ensure that regardless of whether we increase/reduce the 
pool size, maximum is always >= core
-               if (newPoolSize < executor.getCorePoolSize()) {
-                       executor.setCorePoolSize(newPoolSize);
-                       executor.setMaximumPoolSize(newPoolSize);
-               } else {
-                       executor.setMaximumPoolSize(newPoolSize);
-                       executor.setCorePoolSize(newPoolSize);
-               }
-       }
-
-       public void shutdown() {
-               executor.shutdown();
-       }
-
-       public boolean isShutdown() {
-               return executor.isShutdown();
-       }
-
-       public void shutdownNow() {
-               executor.shutdownNow();
-       }
-
-       @VisibleForTesting
-       int getMaximumPoolSize() {
-               return executor.getMaximumPoolSize();
-       }
-
-       @VisibleForTesting
-       int getQueueSize() {
-               return executor.getQueue().size();
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4355821..d4e1d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -93,6 +93,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -257,8 +259,8 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
        /** The observed exception, in case the task execution failed. */
        private volatile Throwable failureCause;
 
-       /** Executor for asynchronous calls (checkpoints, etc), lazily 
initialized. */
-       private volatile BlockingCallMonitoringThreadPool asyncCallDispatcher;
+       /** Serial executor for asynchronous calls (checkpoints, etc), lazily 
initialized. */
+       private volatile ExecutorService asyncCallDispatcher;
 
        /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
        private long taskCancellationInterval;
@@ -789,7 +791,7 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
 
                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
-                               final BlockingCallMonitoringThreadPool 
dispatcher = this.asyncCallDispatcher;
+                               ExecutorService dispatcher = 
this.asyncCallDispatcher;
                                if (dispatcher != null && 
!dispatcher.isShutdown()) {
                                        dispatcher.shutdownNow();
                                }
@@ -1153,8 +1155,7 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                        };
                        executeAsyncCallRunnable(
                                        runnable,
-                                       String.format("Checkpoint Trigger for 
%s (%s).", taskNameWithSubtask, executionId),
-                                       
checkpointOptions.getCheckpointType().isSynchronous());
+                                       String.format("Checkpoint Trigger for 
%s (%s).", taskNameWithSubtask, executionId));
                }
                else {
                        LOG.debug("Declining checkpoint request for non-running 
task {} ({}).", taskNameWithSubtask, executionId);
@@ -1189,8 +1190,8 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                        };
                        executeAsyncCallRunnable(
                                        runnable,
-                                       "Checkpoint Confirmation for " + 
taskNameWithSubtask,
-                                       false);
+                                       "Checkpoint Confirmation for " + 
taskNameWithSubtask
+                       );
                }
                else {
                        LOG.debug("Ignoring checkpoint commit notification for 
non-running task {}.", taskNameWithSubtask);
@@ -1201,11 +1202,10 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
 
        /**
         * Utility method to dispatch an asynchronous call on the invokable.
-        *
-        * @param runnable The async call runnable.
+        *  @param runnable The async call runnable.
         * @param callName The name of the call, for logging purposes.
         */
-       private void executeAsyncCallRunnable(Runnable runnable, String 
callName, boolean blocking) {
+       private void executeAsyncCallRunnable(Runnable runnable, String 
callName) {
                // make sure the executor is initialized. lock against 
concurrent calls to this function
                synchronized (this) {
                        if (executionState != ExecutionState.RUNNING) {
@@ -1213,20 +1213,12 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                        }
 
                        // get ourselves a reference on the stack that cannot 
be concurrently modified
-                       BlockingCallMonitoringThreadPool executor = 
this.asyncCallDispatcher;
+                       ExecutorService executor = this.asyncCallDispatcher;
                        if (executor == null) {
                                // first time use, initialize
                                checkState(userCodeClassLoader != null, 
"userCodeClassLoader must not be null");
 
-                               // Under normal execution, we expect that one 
thread will suffice, this is why we
-                               // keep the core threads to 1. In the case of a 
synchronous savepoint, we will block
-                               // the checkpointing thread, so we need an 
additional thread to execute the
-                               // notifyCheckpointComplete() callback. 
Finally, we aggressively purge (potentially)
-                               // idle thread so that we do not risk to have 
many idle thread on machines with multiple
-                               // tasks on them. Either way, only one of them 
can execute at a time due to the
-                               // checkpoint lock.
-
-                               executor = new BlockingCallMonitoringThreadPool(
+                               executor = Executors.newSingleThreadExecutor(
                                                new DispatcherThreadFactory(
                                                        TASK_THREADS_GROUP,
                                                        "Async calls on " + 
taskNameWithSubtask,
@@ -1245,13 +1237,13 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                        LOG.debug("Invoking async call {} on task {}", 
callName, taskNameWithSubtask);
 
                        try {
-                               executor.submit(runnable, blocking);
+                               executor.submit(runnable);
                        }
                        catch (RejectedExecutionException e) {
                                // may be that we are concurrently finished or 
canceled.
                                // if not, report that something is fishy
                                if (executionState == ExecutionState.RUNNING) {
-                                       throw new RuntimeException("Async call 
with a " + (blocking ? "" : "non-") + "blocking call was rejected, even though 
the task is running.", e);
+                                       throw new RuntimeException("Async call 
was rejected, even though the task is running.", e);
                                }
                        }
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
deleted file mode 100644
index 2cc3454..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link BlockingCallMonitoringThreadPool}.
- */
-public class BlockingCallMonitoringThreadPoolTest {
-
-       private final static int TIME_OUT = 30;
-
-       private final OneShotLatch latch1 = new OneShotLatch();
-       private final OneShotLatch latch2 = new OneShotLatch();
-       private BlockingCallMonitoringThreadPool blockingCallThreadPool = new 
BlockingCallMonitoringThreadPool();
-
-       @Before
-       public void setup() {
-               blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
-               latch1.reset();
-               latch2.reset();
-       }
-
-       @After
-       public void tearDown() {
-               latch1.trigger();
-               latch2.trigger();
-               blockingCallThreadPool.shutdown();
-       }
-
-       @Test
-       public void testSubmitNonBlockingCalls() throws Exception {
-               blockingCallThreadPool.submit(() -> await(latch1), false);
-               blockingCallThreadPool.submit(() -> await(latch2), false);
-
-               assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
-               assertEquals(1, blockingCallThreadPool.getQueueSize());
-       }
-
-       @Test
-       public void testSubmitBlockingCall() throws Exception {
-               CompletableFuture<?> latch1Future = 
blockingCallThreadPool.submit(() -> await(latch1), true);
-               CompletableFuture<?> latch2Future = 
blockingCallThreadPool.submit(() -> await(latch2), false);
-
-               assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
-               assertEquals(0, blockingCallThreadPool.getQueueSize());
-
-               latch2.trigger();
-               latch2Future.get(TIME_OUT, TimeUnit.SECONDS);
-
-               assertFalse(latch1Future.isDone());
-               assertTrue(latch2Future.isDone());
-       }
-
-       @Test
-       public void testDownsizePool() throws Exception {
-               List<CompletableFuture<?>> futures = new ArrayList<>();
-
-               futures.add(blockingCallThreadPool.submit(() -> await(latch1), 
true));
-               futures.add(blockingCallThreadPool.submit(() -> await(latch1), 
true));
-               futures.add(blockingCallThreadPool.submit(() -> await(latch1), 
false));
-
-               assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());
-
-               latch1.trigger();
-
-               for (CompletableFuture<?> future : futures) {
-                       future.get(TIME_OUT, TimeUnit.SECONDS);
-               }
-
-               blockingCallThreadPool.submit(() -> await(latch1), 
false).get(TIME_OUT, TimeUnit.SECONDS);
-               assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
-       }
-
-       private void await(OneShotLatch latch) {
-               try {
-                       latch.await();
-               } catch (InterruptedException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5e43f68..f7b366b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -62,7 +62,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -130,16 +129,12 @@ public class TaskAsyncCallTest extends TestLogger {
        }
 
        // 
------------------------------------------------------------------------
-       //  Tests 
+       //  Tests
        // 
------------------------------------------------------------------------
 
        @Test
-       @Ignore
        public void testCheckpointCallsInOrder() throws Exception {
 
-               // test ignored because with the changes introduced by 
[FLINK-11667],
-               // there is not guarantee about the order in which checkpoints 
are executed.
-
                Task task = createTask(CheckpointsInOrderInvokable.class);
                try (TaskCleaner ignored = new TaskCleaner(task)) {
                        task.startTaskThread();
@@ -160,12 +155,8 @@ public class TaskAsyncCallTest extends TestLogger {
        }
 
        @Test
-       @Ignore
        public void testMixedAsyncCallsInOrder() throws Exception {
 
-               // test ignored because with the changes introduced by 
[FLINK-11667],
-               // there is not guarantee about the order in which checkpoints 
are executed.
-
                Task task = createTask(CheckpointsInOrderInvokable.class);
                try (TaskCleaner ignored = new TaskCleaner(task)) {
                        task.startTaskThread();

Reply via email to