This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 53938680abe846e1947243809b4b43d5f67610d3 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jul 26 16:06:10 2022 +0800 [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo --- .../src/test/resources/rest_api_v1.snapshot | 6 ++++ .../runtime/resourcemanager/ResourceManager.java | 7 +++-- .../taskmanager/TaskManagerDetailsInfo.java | 7 ++++- .../rest/messages/taskmanager/TaskManagerInfo.java | 32 ++++++++++++++++++---- .../taskmanager/TaskManagerDetailsHandlerTest.java | 3 +- .../messages/taskmanager/TaskManagerInfoTest.java | 3 +- 6 files changed, 48 insertions(+), 10 deletions(-) diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index f0e069087af..427f59c2723 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -3439,6 +3439,9 @@ "type" : "integer" } } + }, + "blocked" : { + "type" : "boolean" } } } @@ -3597,6 +3600,9 @@ } } }, + "blocked" : { + "type" : "boolean" + }, "allocatedSlots" : { "type" : "array", "items" : { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 9a9a17b4cbc..fb85cf88231 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -646,7 +646,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()), slotManager.getFreeResourceOf(taskExecutor.getInstanceID()), taskExecutor.getHardwareDescription(), - taskExecutor.getMemoryConfiguration())); + taskExecutor.getMemoryConfiguration(), + blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID()))); } return CompletableFuture.completedFuture(taskManagerInfos); @@ -675,7 +676,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> slotManager.getRegisteredResourceOf(instanceId), slotManager.getFreeResourceOf(instanceId), taskExecutor.getHardwareDescription(), - taskExecutor.getMemoryConfiguration()), + taskExecutor.getMemoryConfiguration(), + blocklistHandler.isBlockedTaskManager( + taskExecutor.getResourceID())), slotManager.getAllocatedSlotsOf(instanceId)); return CompletableFuture.completedFuture(taskManagerInfoWithSlots); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java index 657dfe7af8c..c9df8325e10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java @@ -33,6 +33,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.Objects; @@ -64,6 +66,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo { @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource, @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription, @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration, + @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked, @JsonProperty(FIELD_NAME_ALLOCATED_SLOTS) Collection<SlotInfo> allocatedSlots, @JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) { super( @@ -77,7 +80,8 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo { totalResource, freeResource, hardwareDescription, - memoryConfiguration); + memoryConfiguration, + blocked); this.taskManagerMetrics = Preconditions.checkNotNull(taskManagerMetrics); this.allocatedSlots = Preconditions.checkNotNull(allocatedSlots); @@ -98,6 +102,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo { taskManagerInfoWithSlots.getTaskManagerInfo().getFreeResource(), taskManagerInfoWithSlots.getTaskManagerInfo().getHardwareDescription(), taskManagerInfoWithSlots.getTaskManagerInfo().getMemoryConfiguration(), + taskManagerInfoWithSlots.getTaskManagerInfo().getBlocked(), taskManagerInfoWithSlots.getAllocatedSlots(), taskManagerMetrics); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java index b8d93ccd08d..308e49b504d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java @@ -31,10 +31,14 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Objects; @@ -63,6 +67,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable { public static final String FIELD_NAME_MEMORY = "memoryConfiguration"; + public static final String FIELD_NAME_BLOCKED = "blocked"; + private static final long serialVersionUID = 1L; @JsonProperty(FIELD_NAME_RESOURCE_ID) @@ -99,7 +105,12 @@ public class TaskManagerInfo implements ResponseBody, Serializable { @JsonProperty(FIELD_NAME_MEMORY) private final TaskExecutorMemoryConfiguration memoryConfiguration; + @JsonProperty(FIELD_NAME_BLOCKED) + @JsonInclude(Include.NON_DEFAULT) + private final boolean blocked; + @JsonCreator + // blocked is Nullable since Jackson will assign null if the field is absent while parsing public TaskManagerInfo( @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) @@ -113,7 +124,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable { @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource, @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource, @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription, - @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) { + @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration, + @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) { this.resourceId = Preconditions.checkNotNull(resourceId); this.address = Preconditions.checkNotNull(address); this.dataPort = dataPort; @@ -125,6 +137,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable { this.freeResource = freeResource; this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription); this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration); + this.blocked = (blocked != null) && blocked; } public TaskManagerInfo( @@ -138,7 +151,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable { ResourceProfile totalResource, ResourceProfile freeResource, HardwareDescription hardwareDescription, - TaskExecutorMemoryConfiguration memoryConfiguration) { + TaskExecutorMemoryConfiguration memoryConfiguration, + @Nullable Boolean blocked) { this( resourceId, address, @@ -150,7 +164,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable { ResourceProfileInfo.fromResrouceProfile(totalResource), ResourceProfileInfo.fromResrouceProfile(freeResource), hardwareDescription, - memoryConfiguration); + memoryConfiguration, + blocked); } @JsonIgnore @@ -208,6 +223,11 @@ public class TaskManagerInfo implements ResponseBody, Serializable { return memoryConfiguration; } + @JsonIgnore + public boolean getBlocked() { + return blocked; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -227,7 +247,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable { && Objects.equals(resourceId, that.resourceId) && Objects.equals(address, that.address) && Objects.equals(hardwareDescription, that.hardwareDescription) - && Objects.equals(memoryConfiguration, that.memoryConfiguration); + && Objects.equals(memoryConfiguration, that.memoryConfiguration) + && blocked == that.blocked; } @Override @@ -243,6 +264,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable { totalResource, freeResource, hardwareDescription, - memoryConfiguration); + memoryConfiguration, + blocked); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java index 95a64901af9..5e2e14864d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java @@ -172,7 +172,8 @@ class TaskManagerDetailsHandlerTest { ResourceProfile.ZERO, ResourceProfile.ZERO, new HardwareDescription(0, 0L, 0L, 0L), - new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L)); + new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), + false); } private static HandlerRequest<EmptyRequestBody> createRequest() throws HandlerRequestException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java index 7231c94070b..140134ee953 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java @@ -65,6 +65,7 @@ public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskMan random.nextLong(), random.nextLong(), random.nextLong(), - random.nextLong())); + random.nextLong()), + random.nextBoolean()); } }