reswqa commented on code in PR #22196:
URL: https://github.com/apache/flink/pull/22196#discussion_r1141829794


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTrackerTest.java:
##########
@@ -18,144 +18,191 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Tests for the {@link FineGrainedTaskManagerTracker}. */
-public class FineGrainedTaskManagerTrackerTest extends TestLogger {
+class FineGrainedTaskManagerTrackerTest {
     private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
             new TaskExecutorConnection(
                     ResourceID.generate(),
                     new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
 
+    static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
+            new WorkerResourceSpec.Builder()
+                    .setCpuCores(10.0)
+                    .setTaskHeapMemoryMB(1000)
+                    .setTaskOffHeapMemoryMB(1000)
+                    .setNetworkMemoryMB(1000)
+                    .setManagedMemoryMB(1000)
+                    .build();
+    static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
+    static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
+            
SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
+    static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
+            SlotManagerUtils.generateDefaultSlotResourceProfile(
+                    DEFAULT_WORKER_RESOURCE_SPEC, 
DEFAULT_NUM_SLOTS_PER_WORKER);
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
     @Test
-    public void testInitState() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
-        assertThat(taskManagerTracker.getPendingTaskManagers(), is(empty()));
-        assertThat(taskManagerTracker.getRegisteredTaskManagers(), 
is(empty()));
+    void testInitState() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = 
createAndStartTaskManagerTracker();
+        assertThat(taskManagerTracker.getPendingTaskManagers()).isEmpty();
+        assertThat(taskManagerTracker.getRegisteredTaskManagers()).isEmpty();
     }
 
     @Test
-    public void testAddAndRemoveTaskManager() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
+    void testAllocateTaskManagersAccordingToResultWithNoResourceAllocator() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = 
createTaskManagerTracker();
 
-        // Add task manager
-        taskManagerTracker.addTaskManager(
-                TASK_EXECUTOR_CONNECTION, ResourceProfile.ANY, 
ResourceProfile.ANY);
-        assertThat(taskManagerTracker.getRegisteredTaskManagers().size(), 
is(1));
-        assertTrue(
-                taskManagerTracker
-                        
.getRegisteredTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID())
-                        .isPresent());
+        taskManagerTracker.initialize(
+                NonSupportedResourceAllocatorImpl.INSTANCE, 
EXECUTOR_RESOURCE.getExecutor());
 
-        // Remove task manager
-        
taskManagerTracker.removeTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID());
-        assertThat(taskManagerTracker.getRegisteredTaskManagers().size(), 
is(0));
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 1);
+        ResourceAllocationResult result =
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                new JobID(),
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                DEFAULT_SLOT_RESOURCE_PROFILE)
+                        .build();
+
+        Set<PendingTaskManagerId> failedAllocations =
+                taskManagerTracker.allocateTaskManagersAccordingTo(result);
+        
assertThat(failedAllocations).containsExactly(pendingTaskManager.getPendingTaskManagerId());
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testRemoveUnknownTaskManager() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
+    @Test
+    void testAllocateTaskManagersAccordingToResult() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = 
createTaskManagerTracker();
+        CompletableFuture<ResourceDeclaration> declarationFuture = new 
CompletableFuture<>();
+        Consumer<Collection<ResourceDeclaration>> 
declareResourceNeededConsumer =
+                resourceDeclarations -> {
+                    assertThat(resourceDeclarations).hasSize(1);
+                    
declarationFuture.complete(resourceDeclarations.stream().findFirst().get());
+                };
+
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 1);
+        taskManagerTracker.initialize(
+                new TestingResourceAllocatorBuilder()
+                        
.setDeclareResourceNeededConsumer(declareResourceNeededConsumer)
+                        .build(),
+                EXECUTOR_RESOURCE.getExecutor());
+        ResourceAllocationResult result =
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                new JobID(),
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                DEFAULT_SLOT_RESOURCE_PROFILE)
+                        .build();
+
+        Set<PendingTaskManagerId> failedAllocations =
+                taskManagerTracker.allocateTaskManagersAccordingTo(result);
+        assertThat(failedAllocations).isEmpty();
+        assertThat(declarationFuture)
+                .isCompletedWithValue(
+                        new ResourceDeclaration(
+                                DEFAULT_WORKER_RESOURCE_SPEC, 1, 
Collections.emptySet()));
+        assertThat(taskManagerTracker.getPendingTaskManagers()).hasSize(1);
 
-        taskManagerTracker.removeTaskManager(new InstanceID());
+        boolean registered =
+                taskManagerTracker.registerTaskManager(
+                        TASK_EXECUTOR_CONNECTION,
+                        DEFAULT_TOTAL_RESOURCE_PROFILE,
+                        DEFAULT_SLOT_RESOURCE_PROFILE,
+                        pendingTaskManager.getPendingTaskManagerId());
+        assertThat(registered).isTrue();
+        assertThat(taskManagerTracker.getPendingTaskManagers()).hasSize(0);
     }
 
     @Test
-    public void testAddAndRemovePendingTaskManager() {
-        final PendingTaskManager pendingTaskManager =
-                new PendingTaskManager(ResourceProfile.ANY, 1);
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
-        final JobID jobId = new JobID();
-        final ResourceCounter resourceCounter =
-                ResourceCounter.withResource(ResourceProfile.ANY, 1);
-
-        // Add pending task manager
-        taskManagerTracker.addPendingTaskManager(pendingTaskManager);
-        taskManagerTracker.replaceAllPendingAllocations(
-                Collections.singletonMap(
-                        pendingTaskManager.getPendingTaskManagerId(),
-                        Collections.singletonMap(jobId, resourceCounter)));
-        assertThat(taskManagerTracker.getPendingTaskManagers().size(), is(1));
-        assertThat(
-                taskManagerTracker
-                        
.getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(
-                                ResourceProfile.ANY, ResourceProfile.ANY)
-                        .size(),
-                is(1));
-
-        // Remove pending task manager
-        final Map<JobID, ResourceCounter> records =
-                taskManagerTracker.removePendingTaskManager(
-                        pendingTaskManager.getPendingTaskManagerId());
-        assertThat(taskManagerTracker.getPendingTaskManagers(), is(empty()));
-        assertThat(
-                taskManagerTracker
-                        .getPendingAllocationsOfPendingTaskManager(
-                                pendingTaskManager.getPendingTaskManagerId())
-                        .size(),
-                is(0));
+    void testRegisterAndUnregisterTaskManager() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = 
createAndStartTaskManagerTracker();
+
+        // Add task manager
+        taskManagerTracker.registerTaskManager(
+                TASK_EXECUTOR_CONNECTION,
+                DEFAULT_TOTAL_RESOURCE_PROFILE,
+                DEFAULT_SLOT_RESOURCE_PROFILE,
+                null);
+        assertThat(taskManagerTracker.getRegisteredTaskManagers()).hasSize(1);
         assertThat(
-                taskManagerTracker
-                        
.getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(
-                                ResourceProfile.ANY, ResourceProfile.ANY)
-                        .size(),
-                is(0));
-        assertTrue(records.containsKey(jobId));
-        assertThat(records.get(jobId).getResourceCount(ResourceProfile.ANY), 
is(1));
+                        taskManagerTracker.getRegisteredTaskManager(
+                                TASK_EXECUTOR_CONNECTION.getInstanceID()))
+                .isPresent();
+
+        // Remove task manager
+        
taskManagerTracker.unregisterTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID());
+        assertThat(taskManagerTracker.getRegisteredTaskManagers()).isEmpty();
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testRemoveUnknownPendingTaskManager() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
+    @Test
+    void testUnregisterUnknownTaskManager() {
+        assertThrows(

Review Comment:
   Please use Assertions.assertThatThrowBy in assertJ.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java:
##########
@@ -33,43 +33,39 @@
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;

Review Comment:
   ```suggestion
   
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -21,63 +21,62 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
 
 /** Tracks TaskManager's resource and slot status. */
 interface TaskManagerTracker
         extends TaskManagerResourceInfoProvider, 
ClusterResourceStatisticsProvider {
 
     // 
---------------------------------------------------------------------------------------------
-    // Add / Remove (pending) Resource
+    // initialize
     // 
---------------------------------------------------------------------------------------------
 
     /**
-     * Register a new task manager.
+     * Initialize the TaskManagerTracker.
      *
-     * @param taskExecutorConnection of the new task manager
-     * @param totalResourceProfile of the new task manager
-     * @param defaultSlotResourceProfile of the new task manager
+     * @param resourceAllocator to use for resource (de-)allocations
+     * @param mainThreadExecutor to use to run code in the ResourceManager's 
main thread
      */
-    void addTaskManager(
-            TaskExecutorConnection taskExecutorConnection,
-            ResourceProfile totalResourceProfile,
-            ResourceProfile defaultSlotResourceProfile);
+    void initialize(ResourceAllocator resourceAllocator, Executor 
mainThreadExecutor);
 
-    /**
-     * Unregister a task manager with the given instance id.
-     *
-     * @param instanceId of the task manager
-     */
-    void removeTaskManager(InstanceID instanceId);
+    /** Removes all state from the tracker. */
+    void close();

Review Comment:
   Why this methods belong to `the comments of initialize`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java:
##########
@@ -83,7 +83,13 @@ private static SlotManager createSlotManager(
                     slotManagerConfiguration,
                     slotManagerMetricGroup,
                     new DefaultResourceTracker(),
-                    new FineGrainedTaskManagerTracker(),
+                    new FineGrainedTaskManagerTracker(
+                            slotManagerConfiguration.getMaxTotalCpu(),
+                            slotManagerConfiguration.getMaxTotalMem(),
+                            
slotManagerConfiguration.isWaitResultConsumedBeforeRelease(),
+                            slotManagerConfiguration.getTaskManagerTimeout(),
+                            
slotManagerConfiguration.getDeclareNeededResourceDelay(),

Review Comment:
   How about introduce `TaskManagerTrackerConfiguration` and derive it from 
`SlotManagerConfiguration`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -212,21 +171,13 @@ public void start(
 
         resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-        resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
         resourceEventListener = 
Preconditions.checkNotNull(newResourceEventListener);
         slotStatusSyncer.initialize(
                 taskManagerTracker, resourceTracker, resourceManagerId, 
mainThreadExecutor);
         blockedTaskManagerChecker = 
Preconditions.checkNotNull(newBlockedTaskManagerChecker);
+        taskManagerTracker.initialize(newResourceAllocator, 
mainThreadExecutor);

Review Comment:
   ```suggestion
           
taskManagerTracker.initialize(Preconditions.checkNotNull(newResourceAllocator), 
mainThreadExecutor);       
   ```
   We'd better do this check before pass parameter, otherwise we have to rely 
on all implementations.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java:
##########
@@ -129,19 +129,20 @@ static <T> T 
assertFutureCompleteAndReturn(CompletableFuture<T> completableFutur
     }
 
     static void assertFutureNotComplete(CompletableFuture<?> 
completableFuture) throws Exception {
-        assertThatThrownBy(
-                        () ->
-                                completableFuture.get(
-                                        FUTURE_EXPECT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS),
-                        "Expected to fail with a timeout.")
-                .isInstanceOf(TimeoutException.class);
+        assertThrows(

Review Comment:
   Why did you make this change? The original usage is correct.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java:
##########
@@ -281,23 +263,34 @@ public void testSlotStatusProcessing() {
         // allocationId1 should still be allocated; allocationId2 should be 
freed; allocationId3
         // should continue to be in a pending state;
         
slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), 
slotReport2);
+        assertThat(resourceTracker.getAcquiredResources(jobId))
+                .contains(ResourceRequirement.create(resource, 2));
         assertThat(
-                resourceTracker.getAcquiredResources(jobId),
-                contains(ResourceRequirement.create(resource, 2)));
-        assertThat(
-                taskManagerTracker
-                        
.getRegisteredTaskManager(taskExecutorConnection.getInstanceID())
-                        .get()
-                        .getAvailableResource(),
-                equalTo(ResourceProfile.fromResources(3, 12)));
-        
assertTrue(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).isPresent());
-        
assertFalse(taskManagerTracker.getAllocatedOrPendingSlot(allocationId2).isPresent());
-        
assertTrue(taskManagerTracker.getAllocatedOrPendingSlot(allocationId3).isPresent());
-        assertThat(
-                
taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).get().getState(),
-                is(SlotState.ALLOCATED));
-        assertThat(
-                
taskManagerTracker.getAllocatedOrPendingSlot(allocationId3).get().getState(),
-                is(SlotState.PENDING));
+                        taskManagerTracker.getRegisteredTaskManager(
+                                taskExecutorConnection.getInstanceID()))
+                .hasValueSatisfying(
+                        taskManagerInfo ->
+                                
assertThat(taskManagerInfo.getAvailableResource())
+                                        
.isEqualTo(ResourceProfile.fromResources(3, 12)));
+        
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1)).isPresent();
+        
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId2)).isNotPresent();
+        
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId3)).isPresent();
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1))
+                .hasValueSatisfying(
+                        slot -> 
assertThat(slot.getState()).isEqualTo(SlotState.ALLOCATED));

Review Comment:
   These have ensured that the option presents a value, so 
`assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1)).isPresent()`
 can be safely removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -21,63 +21,62 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
 
 /** Tracks TaskManager's resource and slot status. */

Review Comment:
   We need to describe this class in more detail, similar to the 
`TaskExecutorManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to