This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd75bcd43f6fafee779caf24ddfd56f0e2cf07b0
Author: Andrey Zagrebin <azagre...@gmail.com>
AuthorDate: Mon Jul 8 10:51:19 2019 +0200

    [hotfix][tests][coordination] Move idle task manager release tests into a 
separate suite
---
 .../slotmanager/SlotManagerTest.java               | 104 -------------
 .../TaskManagerReleaseInSlotManagerTest.java       | 172 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 104 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8760f10..c358866 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -669,109 +668,6 @@ public class SlotManagerTest extends TestLogger {
        }
 
        /**
-        * Tests that idle task managers time out after the configured timeout. 
A timed out task manager
-        * will be removed from the slot manager and the resource manager will 
be notified about the
-        * timeout, if it can be released.
-        */
-       @Test
-       public void testTaskManagerTimeout() throws Exception {
-               final long tmTimeout = 10L;
-
-               final CompletableFuture<InstanceID> releaseFuture = new 
CompletableFuture<>();
-               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
-                       .setReleaseResourceConsumer((instanceID, e) -> 
releaseFuture.complete(instanceID))
-                       .build();
-               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceID resourceID = ResourceID.generate();
-
-               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
-
-               final SlotID slotId = new SlotID(resourceID, 0);
-               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
-               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
-               final SlotReport slotReport = new SlotReport(slotStatus);
-
-               final Executor mainThreadExecutor = 
TestingUtils.defaultExecutor();
-
-               try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
-                       .setTaskManagerTimeout(Time.milliseconds(tmTimeout))
-                       .build()) {
-
-                       slotManager.start(resourceManagerId, 
mainThreadExecutor, resourceManagerActions);
-
-                       mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
-                       assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
-               }
-       }
-
-       /**
-        * Tests that idle but not releasable task managers will not be 
released even if timed out before it can be.
-        */
-       @Test
-       public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
-               final CompletableFuture<InstanceID> releaseFuture = new 
CompletableFuture<>();
-               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
-                       .setReleaseResourceConsumer((instanceID, e) -> 
releaseFuture.complete(instanceID))
-                       .build();
-               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceID resourceID = ResourceID.generate();
-
-               final AtomicReference<CompletableFuture<Boolean>> canBeReleased 
= new AtomicReference<>();
-               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
-                       .setCanBeReleasedSupplier(canBeReleased::get)
-                       .createTestingTaskExecutorGateway();
-               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
-
-               final SlotID slotId = new SlotID(resourceID, 0);
-               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
-               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
-               final SlotReport slotReport = new SlotReport(slotStatus);
-
-               final ManuallyTriggeredScheduledExecutor mainThreadExecutor = 
new ManuallyTriggeredScheduledExecutor();
-
-               try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
-                       .setScheduledExecutor(mainThreadExecutor)
-                       .setTaskManagerTimeout(Time.milliseconds(0L))
-                       .build()) {
-
-                       slotManager.start(resourceManagerId, 
mainThreadExecutor, resourceManagerActions);
-
-                       mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
-                       // now it can not be released yet
-                       canBeReleased.set(new CompletableFuture<>());
-                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
-                       mainThreadExecutor.triggerAll();
-                       canBeReleased.get().complete(false);
-                       mainThreadExecutor.triggerAll();
-                       assertThat(releaseFuture.isDone(), is(false));
-
-                       // Allocate and free slot between triggering 
TM.canBeReleased request and receiving response.
-                       // There can be potentially newly unreleased 
partitions, therefore TM can not be released yet.
-                       canBeReleased.set(new CompletableFuture<>());
-                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
-                       mainThreadExecutor.triggerAll();
-                       AllocationID allocationID = new AllocationID();
-                       slotManager.registerSlotRequest(new SlotRequest(new 
JobID(), allocationID, resourceProfile, "foobar"));
-                       mainThreadExecutor.triggerAll();
-                       slotManager.freeSlot(slotId, allocationID);
-                       canBeReleased.get().complete(true);
-                       mainThreadExecutor.triggerAll();
-                       assertThat(releaseFuture.isDone(), is(false));
-
-                       // now it can and should be released
-                       canBeReleased.set(new CompletableFuture<>());
-                       
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
-                       mainThreadExecutor.triggerAll();
-                       canBeReleased.get().complete(true);
-                       mainThreadExecutor.triggerAll();
-                       assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
-               }
-       }
-
-       /**
         * Tests that slot requests time out after the specified request 
timeout. If a slot request
         * times out, then the request is cancelled, removed from the slot 
manager and the resource
         * manager is notified about the failed allocation.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
new file mode 100644
index 0000000..25f7ccb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite for idle task managers release in slot manager.
+ */
+public class TaskManagerReleaseInSlotManagerTest extends TestLogger {
+       private static final ResourceID resourceID = ResourceID.generate();
+       private static final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+       private static final SlotID slotId = new SlotID(resourceID, 0);
+       private static final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+       private static final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+       private static final SlotReport slotReport = new SlotReport(slotStatus);
+
+       private final AtomicReference<CompletableFuture<Boolean>> 
canBeReleasedFuture = new AtomicReference<>();
+       private final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+               .setCanBeReleasedSupplier(canBeReleasedFuture::get)
+               .createTestingTaskExecutorGateway();
+       private final TaskExecutorConnection taskManagerConnection =
+               new TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+       private CompletableFuture<InstanceID> releaseFuture;
+       private ResourceActions resourceManagerActions;
+       private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+
+       @Before
+       public void setup() {
+               canBeReleasedFuture.set(new CompletableFuture<>());
+               releaseFuture = new CompletableFuture<>();
+               resourceManagerActions = new TestingResourceActionsBuilder()
+                       .setReleaseResourceConsumer((instanceID, e) -> 
releaseFuture.complete(instanceID))
+                       .build();
+               mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+       }
+
+       /**
+        * Tests that idle task managers time out after the configured timeout. 
A timed out task manager
+        * will be removed from the slot manager and the resource manager will 
be notified about the
+        * timeout, if it can be released.
+        */
+       @Test
+       public void testTaskManagerTimeout() throws Exception {
+               Executor executor = TestingUtils.defaultExecutor();
+               
canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
+               try (SlotManager slotManager = SlotManagerBuilder
+                       .newBuilder()
+                       .setTaskManagerTimeout(Time.milliseconds(10L))
+                       .build()) {
+
+                       slotManager.start(resourceManagerId, executor, 
resourceManagerActions);
+                       executor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
+                       assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
+               }
+       }
+
+       /**
+        * Tests that idle but not releasable task managers will not be 
released even if timed out before it can be.
+        */
+       @Test
+       public void testTaskManagerIsNotReleasedBeforeItCanBe() throws 
Exception {
+               try (SlotManager slotManager = 
createAndStartSlotManagerWithTM()) {
+                       
checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false);
+                       verifyTmReleased(false);
+
+                       
checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+                       verifyTmReleased(true);
+               }
+       }
+
+       /**
+        * Tests that idle task managers will not be released after "can be" 
check in case of concurrent resource allocations.
+        */
+       @Test
+       public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() 
throws Exception {
+               try (SlotManager slotManager = 
createAndStartSlotManagerWithTM()) {
+                       
checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> 
{
+                               // Allocate and free slot between triggering 
TM.canBeReleased request and receiving response.
+                               // There can be potentially newly unreleased 
partitions, therefore TM can not be released yet.
+                               AllocationID allocationID = new AllocationID();
+                               slotManager.registerSlotRequest(new 
SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
+                               mainThreadExecutor.triggerAll();
+                               slotManager.freeSlot(slotId, allocationID);
+                       });
+                       verifyTmReleased(false);
+
+                       
checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+                       verifyTmReleased(true);
+               }
+       }
+
+       private SlotManager createAndStartSlotManagerWithTM() {
+               SlotManager slotManager = SlotManagerBuilder
+                       .newBuilder()
+                       .setScheduledExecutor(mainThreadExecutor)
+                       .setTaskManagerTimeout(Time.milliseconds(0L))
+                       .build();
+               slotManager.start(resourceManagerId, mainThreadExecutor, 
resourceManagerActions);
+               mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
+               return slotManager;
+       }
+
+       private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+                       SlotManager slotManager,
+                       boolean canBeReleased) throws Exception {
+               
checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, 
canBeReleased, () -> {});
+       }
+
+       private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+                       SlotManager slotManager,
+                       boolean canBeReleased,
+                       RunnableWithException 
doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
+               canBeReleasedFuture.set(new CompletableFuture<>());
+               
mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger 
TM.canBeReleased request
+               mainThreadExecutor.triggerAll();
+               doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
+               canBeReleasedFuture.get().complete(canBeReleased); // finish 
TM.canBeReleased request
+               mainThreadExecutor.triggerAll();
+       }
+
+       private void verifyTmReleased(boolean isTmReleased) {
+               assertThat(releaseFuture.isDone(), is(isTmReleased));
+               if (isTmReleased) {
+                       assertThat(releaseFuture.join(), 
is(equalTo(taskManagerConnection.getInstanceID())));
+               }
+       }
+}

Reply via email to