This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0263b55288be7b569f56dd42a94c5e48bcc1607b Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jul 26 16:16:45 2022 +0800 [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview --- .../src/test/resources/rest_api_v1.snapshot | 6 ++ .../messages/webmonitor/ClusterOverview.java | 52 +++++++++--- .../runtime/resourcemanager/ResourceManager.java | 26 +++++- .../runtime/resourcemanager/ResourceOverview.java | 18 +++- .../messages/ClusterOverviewWithVersion.java | 26 +++--- .../resourcemanager/ResourceManagerTest.java | 96 ++++++++++++++++++++++ .../utils/TestingResourceManagerGateway.java | 2 +- .../messages/ClusterOverviewWithVersionTest.java | 2 +- .../runtime/webmonitor/TestingRestfulGateway.java | 2 +- 9 files changed, 196 insertions(+), 34 deletions(-) diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 427f59c2723..873e5062d7d 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -3225,6 +3225,12 @@ "slots-available" : { "type" : "integer" }, + "taskmanagers-blocked" : { + "type" : "integer" + }, + "slots-free-and-blocked" : { + "type" : "integer" + }, "jobs-running" : { "type" : "integer" }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java index 34bf05d0711..7ea327c8029 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java @@ -21,8 +21,12 @@ package org.apache.flink.runtime.messages.webmonitor; import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + /** * Response to the {@link RequestStatusOverview} message, carrying a description of the Flink * cluster status. @@ -34,6 +38,8 @@ public class ClusterOverview extends JobsOverview { public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers"; public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total"; public static final String FIELD_NAME_SLOTS_AVAILABLE = "slots-available"; + public static final String FIELD_NAME_TASKMANAGERS_BLOCKED = "taskmanagers-blocked"; + public static final String FIELD_NAME_SLOTS_FREE_AND_BLOCKED = "slots-free-and-blocked"; @JsonProperty(FIELD_NAME_TASKMANAGERS) private final int numTaskManagersConnected; @@ -44,11 +50,24 @@ public class ClusterOverview extends JobsOverview { @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) private final int numSlotsAvailable; + @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) + @JsonInclude(Include.NON_DEFAULT) + private final int numTaskManagersBlocked; + + @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) + @JsonInclude(Include.NON_DEFAULT) + private final int numSlotsFreeAndBlocked; + @JsonCreator + // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null + // if the field is absent while parsing public ClusterOverview( @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected, @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal, @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable, + @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked, + @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable + Integer numSlotsFreeAndBlocked, @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending, @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished, @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled, @@ -59,18 +78,8 @@ public class ClusterOverview extends JobsOverview { this.numTaskManagersConnected = numTaskManagersConnected; this.numSlotsTotal = numSlotsTotal; this.numSlotsAvailable = numSlotsAvailable; - } - - public ClusterOverview( - int numTaskManagersConnected, - int numSlotsTotal, - int numSlotsAvailable, - JobsOverview jobs1, - JobsOverview jobs2) { - super(jobs1, jobs2); - this.numTaskManagersConnected = numTaskManagersConnected; - this.numSlotsTotal = numSlotsTotal; - this.numSlotsAvailable = numSlotsAvailable; + this.numTaskManagersBlocked = numTaskManagersBlocked == null ? 0 : numTaskManagersBlocked; + this.numSlotsFreeAndBlocked = numSlotsFreeAndBlocked == null ? 0 : numSlotsFreeAndBlocked; } public ClusterOverview(ResourceOverview resourceOverview, JobsOverview jobsOverview) { @@ -78,6 +87,8 @@ public class ClusterOverview extends JobsOverview { resourceOverview.getNumberTaskManagers(), resourceOverview.getNumberRegisteredSlots(), resourceOverview.getNumberFreeSlots(), + resourceOverview.getNumberBlockedTaskManagers(), + resourceOverview.getNumberBlockedFreeSlots(), jobsOverview.getNumJobsRunningOrPending(), jobsOverview.getNumJobsFinished(), jobsOverview.getNumJobsCancelled(), @@ -96,6 +107,13 @@ public class ClusterOverview extends JobsOverview { return numSlotsAvailable; } + public int getNumTaskManagersBlocked() { + return numTaskManagersBlocked; + } + + public int getNumSlotsFreeAndBlocked() { + return numSlotsFreeAndBlocked; + } // ------------------------------------------------------------------------ @Override @@ -107,6 +125,8 @@ public class ClusterOverview extends JobsOverview { return this.numTaskManagersConnected == that.numTaskManagersConnected && this.numSlotsTotal == that.numSlotsTotal && this.numSlotsAvailable == that.numSlotsAvailable + && this.numTaskManagersBlocked == that.numTaskManagersBlocked + && this.numSlotsFreeAndBlocked == that.numSlotsFreeAndBlocked && this.getNumJobsRunningOrPending() == that.getNumJobsRunningOrPending() && this.getNumJobsFinished() == that.getNumJobsFinished() && this.getNumJobsCancelled() == that.getNumJobsCancelled() @@ -122,6 +142,8 @@ public class ClusterOverview extends JobsOverview { result = 31 * result + numTaskManagersConnected; result = 31 * result + numSlotsTotal; result = 31 * result + numSlotsAvailable; + result = 31 * result + numTaskManagersBlocked; + result = 31 * result + numSlotsFreeAndBlocked; return result; } @@ -130,10 +152,16 @@ public class ClusterOverview extends JobsOverview { return "StatusOverview {" + "numTaskManagersConnected=" + numTaskManagersConnected + + (numTaskManagersBlocked == 0 + ? "" + : (", numTaskManagersBlocked=" + numTaskManagersBlocked)) + ", numSlotsTotal=" + numSlotsTotal + ", numSlotsAvailable=" + numSlotsAvailable + + (numSlotsFreeAndBlocked == 0 + ? "" + : (", numSlotsFreeAndBlocked=" + numSlotsFreeAndBlocked)) + ", numJobsRunningOrPending=" + getNumJobsRunningOrPending() + ", numJobsFinished=" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index fb85cf88231..8b4f26ea68d 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -688,15 +688,34 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) { final int numberSlots = slotManager.getNumberRegisteredSlots(); - final int numberFreeSlots = slotManager.getNumberFreeSlots(); final ResourceProfile totalResource = slotManager.getRegisteredResource(); - final ResourceProfile freeResource = slotManager.getFreeResource(); + int numberFreeSlots = slotManager.getNumberFreeSlots(); + ResourceProfile freeResource = slotManager.getFreeResource(); + + int blockedTaskManagers = 0; + int totalBlockedFreeSlots = 0; + if (!blocklistHandler.getAllBlockedNodeIds().isEmpty()) { + for (WorkerRegistration<WorkerType> registration : taskExecutors.values()) { + if (blocklistHandler.isBlockedTaskManager(registration.getResourceID())) { + blockedTaskManagers++; + int blockedFreeSlots = + slotManager.getNumberFreeSlotsOf(registration.getInstanceID()); + totalBlockedFreeSlots += blockedFreeSlots; + numberFreeSlots -= blockedFreeSlots; + freeResource = + freeResource.subtract( + slotManager.getFreeResourceOf(registration.getInstanceID())); + } + } + } return CompletableFuture.completedFuture( new ResourceOverview( taskExecutors.size(), numberSlots, numberFreeSlots, + blockedTaskManagers, + totalBlockedFreeSlots, totalResource, freeResource)); } @@ -866,7 +885,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> // Internal methods // ------------------------------------------------------------------------ - private String getNodeIdOfTaskManager(ResourceID taskManagerId) { + @VisibleForTesting + String getNodeIdOfTaskManager(ResourceID taskManagerId) { checkState(taskExecutors.containsKey(taskManagerId)); return taskExecutors.get(taskManagerId).getNodeId(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java index 8e9ee2dad1f..d68b17346fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java @@ -28,7 +28,7 @@ public class ResourceOverview implements Serializable { private static final long serialVersionUID = 7618746920569224557L; private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW = - new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO); + new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO); private final int numberTaskManagers; @@ -36,6 +36,10 @@ public class ResourceOverview implements Serializable { private final int numberFreeSlots; + private final int numberBlockedTaskManagers; + + private final int numberBlockedFreeSlots; + private final ResourceProfile totalResource; private final ResourceProfile freeResource; @@ -44,11 +48,15 @@ public class ResourceOverview implements Serializable { int numberTaskManagers, int numberRegisteredSlots, int numberFreeSlots, + int numberBlockedTaskManagers, + int numberBlockedFreeSlots, ResourceProfile totalResource, ResourceProfile freeResource) { this.numberTaskManagers = numberTaskManagers; this.numberRegisteredSlots = numberRegisteredSlots; this.numberFreeSlots = numberFreeSlots; + this.numberBlockedTaskManagers = numberBlockedTaskManagers; + this.numberBlockedFreeSlots = numberBlockedFreeSlots; this.totalResource = totalResource; this.freeResource = freeResource; } @@ -65,6 +73,14 @@ public class ResourceOverview implements Serializable { return numberFreeSlots; } + public int getNumberBlockedTaskManagers() { + return numberBlockedTaskManagers; + } + + public int getNumberBlockedFreeSlots() { + return numberBlockedFreeSlots; + } + public ResourceProfile getTotalResource() { return totalResource; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java index fa40db26cfd..7beaa32ab65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java @@ -19,13 +19,14 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; -import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + import java.util.Objects; /** Cluster overview message including the current Flink version and commit id. */ @@ -43,10 +44,15 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo private final String commitId; @JsonCreator + // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null + // if the field is absent while parsing public ClusterOverviewWithVersion( @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected, @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal, @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable, + @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked, + @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable + Integer numSlotsFreeAndBlocked, @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending, @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished, @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled, @@ -57,6 +63,8 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, + numTaskManagersBlocked, + numSlotsFreeAndBlocked, numJobsRunningOrPending, numJobsFinished, numJobsCancelled, @@ -66,26 +74,14 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo this.commitId = Preconditions.checkNotNull(commitId); } - public ClusterOverviewWithVersion( - int numTaskManagersConnected, - int numSlotsTotal, - int numSlotsAvailable, - JobsOverview jobs1, - JobsOverview jobs2, - String version, - String commitId) { - super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2); - - this.version = Preconditions.checkNotNull(version); - this.commitId = Preconditions.checkNotNull(commitId); - } - public static ClusterOverviewWithVersion fromStatusOverview( ClusterOverview statusOverview, String version, String commitId) { return new ClusterOverviewWithVersion( statusOverview.getNumTaskManagersConnected(), statusOverview.getNumSlotsTotal(), statusOverview.getNumSlotsAvailable(), + statusOverview.getNumTaskManagersBlocked(), + statusOverview.getNumSlotsFreeAndBlocked(), statusOverview.getNumJobsRunningOrPending(), statusOverview.getNumJobsFinished(), statusOverview.getNumJobsCancelled(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index eba7a62bd5b..d318fd2449f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -28,9 +28,11 @@ import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler; import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; @@ -52,14 +54,18 @@ import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.slots.ResourceRequirements; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.AfterAll; @@ -72,6 +78,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -670,6 +677,95 @@ class ResourceManagerTest { assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder(blockedNode1, blockedNode2); } + @Test + void testResourceOverviewWithBlockedSlots() throws Exception { + ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor(); + final SlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder(executor).build(); + resourceManager = + new ResourceManagerBuilder() + .withSlotManager(slotManager) + .withBlocklistHandlerFactory( + new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))) + .buildAndStart(); + + final ResourceManagerGateway resourceManagerGateway = + resourceManager.getSelfGateway(ResourceManagerGateway.class); + + ResourceID taskExecutor = ResourceID.generate(); + ResourceID taskExecutorToBlock = ResourceID.generate(); + registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3); + registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutorToBlock, 5); + executor.triggerAll(); + + ResourceOverview overview = + resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get(); + assertThat(overview.getNumberTaskManagers()).isEqualTo(2); + assertThat(overview.getNumberRegisteredSlots()).isEqualTo(8); + assertThat(overview.getNumberFreeSlots()).isEqualTo(8); + assertThat(overview.getNumberBlockedTaskManagers()).isEqualTo(0); + assertThat(overview.getNumberBlockedFreeSlots()).isEqualTo(0); + assertThat(overview.getTotalResource()) + .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8)); + assertThat(overview.getFreeResource()) + .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8)); + + resourceManagerGateway.notifyNewBlockedNodes( + Collections.singleton( + new BlockedNode( + resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock), + "Test cause", + Long.MAX_VALUE))); + + ResourceOverview overviewBlocked = + resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get(); + assertThat(overviewBlocked.getNumberTaskManagers()).isEqualTo(2); + assertThat(overviewBlocked.getNumberRegisteredSlots()).isEqualTo(8); + assertThat(overviewBlocked.getNumberFreeSlots()).isEqualTo(3); + assertThat(overviewBlocked.getNumberBlockedTaskManagers()).isEqualTo(1); + assertThat(overviewBlocked.getNumberBlockedFreeSlots()).isEqualTo(5); + assertThat(overviewBlocked.getTotalResource()) + .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8)); + assertThat(overviewBlocked.getFreeResource()) + .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(3)); + } + + private void registerTaskExecutorAndSlot( + ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerId, int slotCount) + throws Exception { + final TaskExecutorGateway taskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setAddress(UUID.randomUUID().toString()) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); + TaskExecutorRegistration taskExecutorRegistration = + new TaskExecutorRegistration( + taskExecutorGateway.getAddress(), + taskManagerId, + dataPort, + jmxPort, + hardwareDescription, + new TaskExecutorMemoryConfiguration( + 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), + ResourceProfile.fromResources(1, 1024), + ResourceProfile.fromResources(1, 1024).multiply(slotCount), + taskExecutorGateway.getAddress()); + RegistrationResponse registrationResult = + resourceManagerGateway + .registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT) + .get(); + assertThat(registrationResult).isInstanceOf(TaskExecutorRegistrationSuccess.class); + InstanceID instanceID = + ((TaskExecutorRegistrationSuccess) registrationResult).getRegistrationId(); + List<SlotStatus> slots = new ArrayList<>(); + for (int i = 0; i < slotCount; i++) { + slots.add( + new SlotStatus( + new SlotID(taskManagerId, i), ResourceProfile.fromResources(1, 1024))); + } + resourceManagerGateway.sendSlotReport( + taskManagerId, instanceID, new SlotReport(slots), Time.seconds(5)); + } + private JobMasterGateway createJobMasterGateway(Collection<BlockedNode> receivedBlockedNodes) { final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 82b0236190a..7a5c7493a31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -405,7 +405,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { @Override public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) { return CompletableFuture.completedFuture( - new ResourceOverview(1, 1, 1, ResourceProfile.ZERO, ResourceProfile.ZERO)); + new ResourceOverview(1, 1, 1, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO)); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java index 4dff5fb8ae8..71a2977ecbb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java @@ -31,6 +31,6 @@ public class ClusterOverviewWithVersionTest @Override protected ClusterOverviewWithVersion getTestResponseInstance() { - return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit"); + return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index bb313cacd8d..ffd53db2b9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -74,7 +74,7 @@ public class TestingRestfulGateway implements RestfulGateway { DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> CompletableFuture.completedFuture( - new ClusterOverview(0, 0, 0, 0, 0, 0, 0)); + new ClusterOverview(0, 0, 0, 0, 0, 0, 0, 0, 0)); static final Supplier<CompletableFuture<Collection<String>>> DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> CompletableFuture.completedFuture(Collections.emptyList());