This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new a5e89247f [Engine] [ResourceManager] Add slot active check in 
ResourceManager (#2740)
a5e89247f is described below

commit a5e89247f0cf02677e01b6a37bc5a36bb2ee450a
Author: Hisoka <[email protected]>
AuthorDate: Thu Sep 15 18:40:20 2022 +0800

    [Engine] [ResourceManager] Add slot active check in ResourceManager (#2740)
---
 .../engine/server/resourcemanager/AbstractResourceManager.java     | 7 +++++++
 .../seatunnel/engine/server/resourcemanager/ResourceManager.java   | 7 +++++++
 2 files changed, 14 insertions(+)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 2572c3c77..c10d928db 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -36,6 +36,7 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -158,6 +159,12 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
         return sendToMember(new ReleaseSlotOperation(jobId, profile), 
profile.getWorker());
     }
 
+    @Override
+    public boolean slotActiveCheck(SlotProfile profile) {
+        return registerWorker.values().stream().flatMap(workerProfile -> 
Arrays.stream(workerProfile.getAssignedSlots()))
+            .anyMatch(s -> s.getSlotID() == profile.getSlotID());
+    }
+
     @Override
     public void heartbeat(WorkerProfile workerProfile) {
         if (!registerWorker.containsKey(workerProfile.getAddress())) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index c8fa75c84..3e2a9b558 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -37,6 +37,13 @@ public interface ResourceManager {
 
     CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile);
 
+    /**
+     * Check {@link SlotProfile} is active or not. Not active meaning can't 
use this slot to deploy task.
+     *
+     * @return active or not
+     */
+    boolean slotActiveCheck(SlotProfile profile);
+
     /**
      * Every time ResourceManager and Worker communicate, heartbeat method 
should be called to
      * record the latest Worker status

Reply via email to