Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190611660 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -1483,6 +1485,216 @@ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio } } + /** + * Tests that we ignore slot requests if the TaskExecutor is not + * registered at a ResourceManager. + */ + @Test + public void testIgnoringSlotRequestsIfNotRegistered() throws Exception { + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build(); + + final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices); + + taskExecutor.start(); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + + final CompletableFuture<RegistrationResponse> registrationFuture = new CompletableFuture<>(); + final CompletableFuture<ResourceID> taskExecutorResourceIdFuture = new CompletableFuture<>(); + + testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> { + taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1); + return registrationFuture; + }); + + rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final ResourceID resourceId = taskExecutorResourceIdFuture.get(); + + final SlotID slotId = new SlotID(resourceId, 0); + final CompletableFuture<Acknowledge> slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout); + + try { + slotRequestResponse.get(); + fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(TaskManagerException.class)); + } + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + /** + * Tests that the TaskExecutor tries to reconnect to a ResourceManager from which it + * was explicitly disconnected. + */ + @Test + public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception { + final long heartbeatInterval = 1000L; + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration(configuration), + haServices, + new TaskManagerServicesBuilder() + .setTaskSlotTable(taskSlotTable) + .setTaskManagerLocation(taskManagerLocation) + .build(), + new HeartbeatServices(heartbeatInterval, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + dummyBlobCacheService, + testingFatalErrorHandler); + + taskExecutor.start(); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234); + final CompletableFuture<RegistrationResponse> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), heartbeatInterval, clusterInformation)); + final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1); + + testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> { + registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1); --- End diff -- Arrg, the same.
---